diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index a1bf40a0ece224..699306e1f8c8f7 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -58,18 +58,18 @@ AgentServer::AgentServer(ExecEnv* exec_env, // clean dpp download dir _command_executor = new CommandExecutor(); - vector root_paths_stat; - _command_executor->get_all_root_path_stat(&root_paths_stat); - for (auto root_path_stat : root_paths_stat) { + vector root_paths_info; + _command_executor->get_all_root_path_info(&root_paths_info); + for (auto root_path_info: root_paths_info) { try { - string dpp_download_path_str = root_path_stat.root_path + DPP_PREFIX; + string dpp_download_path_str = root_path_info.path + DPP_PREFIX; boost::filesystem::path dpp_download_path(dpp_download_path_str); if (boost::filesystem::exists(dpp_download_path)) { boost::filesystem::remove_all(dpp_download_path); } } catch (...) { OLAP_LOG_WARNING("boost exception when remove dpp download path. [path='%s']", - root_path_stat.root_path.c_str()); + root_path_info.path.c_str()); } } diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 62fe5446e72f5d..fec026a3682e88 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1541,19 +1541,19 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) } #endif - vector root_paths_stat; + vector root_paths_info; - worker_pool_this->_command_executor->get_all_root_path_stat(&root_paths_stat); + worker_pool_this->_command_executor->get_all_root_path_info(&root_paths_info); map disks; - for (auto root_path_state : root_paths_stat) { + for (auto root_path_info : root_paths_info) { TDisk disk; - disk.__set_root_path(root_path_state.root_path); - disk.__set_disk_total_capacity(static_cast(root_path_state.disk_total_capacity)); - disk.__set_data_used_capacity(static_cast(root_path_state.data_used_capacity)); - disk.__set_disk_available_capacity(static_cast(root_path_state.disk_available_capacity)); - disk.__set_used(root_path_state.is_used); - disks[root_path_state.root_path] = disk; + disk.__set_root_path(root_path_info.path); + disk.__set_disk_total_capacity(static_cast(root_path_info.capacity)); + disk.__set_data_used_capacity(static_cast(root_path_info.data_used_capacity)); + disk.__set_disk_available_capacity(static_cast(root_path_info.available)); + disk.__set_used(root_path_info.is_used); + disks[root_path_info.path] = disk; } request.__set_disks(disks); diff --git a/be/src/exec/csv_scan_node.cpp b/be/src/exec/csv_scan_node.cpp index 64920b922558e4..5365a83ca7eb02 100644 --- a/be/src/exec/csv_scan_node.cpp +++ b/be/src/exec/csv_scan_node.cpp @@ -25,6 +25,7 @@ #include #include "exec/text_converter.hpp" +#include "exprs/hll_hash_function.h" #include "gen_cpp/PlanNodes_types.h" #include "runtime/runtime_state.h" #include "runtime/row_batch.h" @@ -652,19 +653,17 @@ void CsvScanNode::hll_hash(const char* src, int len, std::string* result) { std::string str(src, len); if (str != "\\N") { uint64_t hash = HashUtil::murmur_hash64A(src, len, HashUtil::MURMUR_SEED); - char buf[11]; - memset(buf, 0, 11); + char buf[HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE]; // expliclit set buf[0] = HLL_DATA_EXPLICIT; buf[1] = 1; *((uint64_t*)(buf + 2)) = hash; - *result = std::string(buf, 11); + *result = std::string(buf, sizeof(buf)); } else { - char buf[2]; - memset(buf, 0, 2); + char buf[HllHashFunctions::HLL_EMPTY_SET_SIZE]; // empty set buf[0] = HLL_DATA_EMPTY; - *result = std::string(buf, 2); + *result = std::string(buf, sizeof(buf)); } } diff --git a/be/src/exprs/hll_hash_function.cpp b/be/src/exprs/hll_hash_function.cpp index eeefc2023ed732..acb380d3305c74 100644 --- a/be/src/exprs/hll_hash_function.cpp +++ b/be/src/exprs/hll_hash_function.cpp @@ -34,26 +34,31 @@ namespace palo { using palo_udf::BigIntVal; using palo_udf::StringVal; - + +const int HllHashFunctions::HLL_INIT_EXPLICT_SET_SIZE = 10; +const int HllHashFunctions::HLL_EMPTY_SET_SIZE = 1; + void HllHashFunctions::init() { } StringVal HllHashFunctions::create_string_result(palo_udf::FunctionContext* ctx, const StringVal& val, const bool is_null) { - std::string result; + StringVal result; if (is_null) { // HLL_DATA_EMPTY - result = "0"; + char buf[HLL_EMPTY_SET_SIZE]; + buf[0] = HLL_DATA_EMPTY; + result = AnyValUtil::from_buffer_temp(ctx, buf, sizeof(buf)); } else { // HLL_DATA_EXPLHLL_DATA_EXPLICIT uint64_t hash = HashUtil::murmur_hash64A(val.ptr, val.len, HashUtil::MURMUR_SEED); - char buf[10]; + char buf[HLL_INIT_EXPLICT_SET_SIZE]; buf[0] = HLL_DATA_EXPLICIT; buf[1] = 1; *((uint64_t*)(buf + 2)) = hash; - result = std::string(buf, 10); - } - return AnyValUtil::from_buffer_temp(ctx, result.c_str(), result.length()); + result = AnyValUtil::from_buffer_temp(ctx, buf, sizeof(buf)); + } + return result; } StringVal HllHashFunctions::hll_hash(palo_udf::FunctionContext* ctx, diff --git a/be/src/exprs/hll_hash_function.h b/be/src/exprs/hll_hash_function.h index dd150b247bdf00..3dc4ff71ccdce2 100644 --- a/be/src/exprs/hll_hash_function.h +++ b/be/src/exprs/hll_hash_function.h @@ -42,6 +42,9 @@ class HllHashFunctions { const palo_udf::StringVal& dest_base); static StringVal create_string_result(palo_udf::FunctionContext* ctx, const StringVal& str, const bool is_null); + + static const int HLL_INIT_EXPLICT_SET_SIZE; + static const int HLL_EMPTY_SET_SIZE; }; } diff --git a/be/src/olap/command_executor.cpp b/be/src/olap/command_executor.cpp index 737e8cf61ecbe2..8e4dd06dc56fd3 100755 --- a/be/src/olap/command_executor.cpp +++ b/be/src/olap/command_executor.cpp @@ -696,18 +696,18 @@ OLAPStatus CommandExecutor::cancel_delete(const TCancelDeleteDataReq& request) { return res; } -OLAPStatus CommandExecutor::get_all_root_path_stat( - std::vector* root_paths_stat) { - OLAP_LOG_INFO("begin to process get all root path stat."); +OLAPStatus CommandExecutor::get_all_root_path_info( + std::vector* root_paths_info) { + OLAP_LOG_INFO("begin to process get all root path info."); OLAPStatus res = OLAP_SUCCESS; - res = OLAPRootPath::get_instance()->get_all_root_path_stat(root_paths_stat); + res = OLAPRootPath::get_instance()->get_all_root_path_info(root_paths_info); if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to process get all root path stat. [res=%d]", res); + OLAP_LOG_WARNING("fail to process get all root path info. [res=%d]", res); return res; } - OLAP_LOG_INFO("success to process get all root path stat."); + OLAP_LOG_INFO("success to process get all root path info."); return res; } diff --git a/be/src/olap/command_executor.h b/be/src/olap/command_executor.h index c1b00c174989a3..0c0027b55baacb 100644 --- a/be/src/olap/command_executor.h +++ b/be/src/olap/command_executor.h @@ -210,7 +210,7 @@ class CommandExecutor { // // @param root_paths_stat each root path stat including total/used/available capacity // @return error code - virtual OLAPStatus get_all_root_path_stat(std::vector* root_paths_stat); + virtual OLAPStatus get_all_root_path_info(std::vector* root_paths_info); private: // Create initial base and delta version. diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index 461f7943643172..f4989c52a2bc37 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -260,13 +260,13 @@ OLAPStatus OLAPEngine::init() { } // 初始化CE调度器 - vector all_root_paths_stat; - OLAPRootPath::get_instance()->get_all_disk_stat(&all_root_paths_stat); - _cumulative_compaction_disk_stat.reserve(all_root_paths_stat.size()); - for (uint32_t i = 0; i < all_root_paths_stat.size(); i++) { - const OLAPRootPathStat& stat = all_root_paths_stat[i]; - _cumulative_compaction_disk_stat.emplace_back(stat.root_path, i, stat.is_used); - _disk_id_map[stat.root_path] = i; + vector all_root_paths_info; + OLAPRootPath::get_instance()->get_all_root_path_info(&all_root_paths_info); + _cumulative_compaction_disk_stat.reserve(all_root_paths_info.size()); + for (uint32_t i = 0; i < all_root_paths_info.size(); i++) { + const RootPathInfo& info = all_root_paths_info[i]; + _cumulative_compaction_disk_stat.emplace_back(info.path, i, info.is_used); + _disk_id_map[info.path] = i; } int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads; int32_t base_compaction_num_threads = config::base_compaction_num_threads; @@ -1044,11 +1044,11 @@ void OLAPEngine::start_cumulative_priority() { // determine whether to select candidate or not bool is_select = false; - vector all_root_paths_stat; - OLAPRootPath::get_instance()->get_all_disk_stat(&all_root_paths_stat); - for (uint32_t i = 0; i < all_root_paths_stat.size(); i++) { - uint32_t disk_id = _disk_id_map[all_root_paths_stat[i].root_path]; - _cumulative_compaction_disk_stat[disk_id].is_used = all_root_paths_stat[i].is_used; + vector all_root_paths_info; + OLAPRootPath::get_instance()->get_all_root_path_info(&all_root_paths_info); + for (uint32_t i = 0; i < all_root_paths_info.size(); i++) { + uint32_t disk_id = _disk_id_map[all_root_paths_info[i].path]; + _cumulative_compaction_disk_stat[disk_id].is_used = all_root_paths_info[i].is_used; } for (auto& disk : _cumulative_compaction_disk_stat) { @@ -1126,8 +1126,8 @@ OLAPStatus OLAPEngine::start_trash_sweep(double* usage) { const uint32_t snapshot_expire = config::snapshot_expire_time_sec; const uint32_t trash_expire = config::trash_file_expire_time_sec; const double guard_space = config::disk_capacity_insufficient_percentage / 100.0; - std::vector disks_stat; - res = OLAPRootPath::get_instance()->get_all_disk_stat(&disks_stat); + std::vector root_paths_info; + res = OLAPRootPath::get_instance()->get_all_root_path_info(&root_paths_info); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("failed to get root path stat info when sweep trash."); return res; @@ -1141,17 +1141,17 @@ OLAPStatus OLAPEngine::start_trash_sweep(double* usage) { } const time_t local_now = mktime(&local_tm_now); //得到当地日历时间 - for (OLAPRootPathStat& stat : disks_stat) { - if (!stat.is_used) { + for (RootPathInfo& info : root_paths_info) { + if (!info.is_used) { continue; } - double curr_usage = (stat.disk_total_capacity - stat.disk_available_capacity) - / (double) stat.disk_total_capacity; + double curr_usage = info.data_used_capacity + / (double) info.capacity; *usage = *usage > curr_usage ? *usage : curr_usage; OLAPStatus curr_res = OLAP_SUCCESS; - string snapshot_path = stat.root_path + SNAPSHOT_PREFIX; + string snapshot_path = info.path + SNAPSHOT_PREFIX; curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire); if (curr_res != OLAP_SUCCESS) { OLAP_LOG_WARNING("failed to sweep snapshot. [path=%s, err_code=%d]", @@ -1159,7 +1159,7 @@ OLAPStatus OLAPEngine::start_trash_sweep(double* usage) { res = curr_res; } - string trash_path = stat.root_path + TRASH_PREFIX; + string trash_path = info.path + TRASH_PREFIX; curr_res = _do_sweep(trash_path, local_now, curr_usage > guard_space ? 0 : trash_expire); if (curr_res != OLAP_SUCCESS) { diff --git a/be/src/olap/olap_rootpath.cpp b/be/src/olap/olap_rootpath.cpp index f9ad50fc705f74..b8c37fffb1d04d 100644 --- a/be/src/olap/olap_rootpath.cpp +++ b/be/src/olap/olap_rootpath.cpp @@ -155,6 +155,7 @@ OLAPStatus OLAPRootPath::init() { for (size_t i = 0; i < root_path_vec.size(); ++i) { RootPathInfo root_path_info; + root_path_info.path = root_path_vec[i]; root_path_info.is_used = true; root_path_info.capacity = capacity_vec[i]; res = _update_root_path_info(root_path_vec[i], &root_path_info); @@ -244,56 +245,27 @@ void OLAPRootPath::get_all_available_root_path(RootPathVec* all_available_root_p _mutex.unlock(); } -OLAPStatus OLAPRootPath::get_all_disk_stat(vector* disks_stat) { +OLAPStatus OLAPRootPath::get_all_root_path_info(vector* root_paths_info) { OLAPStatus res = OLAP_SUCCESS; - disks_stat->clear(); + root_paths_info->clear(); _mutex.lock(); for (RootPathMap::iterator it = _root_paths.begin(); it != _root_paths.end(); ++it) { - OLAPRootPathStat stat; - stat.root_path = it->first; - stat.is_used = it->second.is_used; - - disks_stat->push_back(stat); - } - _mutex.unlock(); - - for (auto& stat : *disks_stat) { - if (stat.is_used) { - _get_disk_capacity(stat.root_path, &stat.disk_total_capacity, &stat.disk_available_capacity); - } else { - stat.disk_total_capacity = 0; - stat.disk_available_capacity = 0; - stat.data_used_capacity = 0; - } - } - - return res; -} - -OLAPStatus OLAPRootPath::get_all_root_path_stat(vector* root_paths_stat) { - OLAPStatus res = OLAP_SUCCESS; - root_paths_stat->clear(); - - _mutex.lock(); - for (RootPathMap::iterator it = _root_paths.begin(); it != _root_paths.end(); ++it) { - OLAPRootPathStat stat; - stat.root_path = it->first; - stat.is_used = it->second.is_used; - stat.disk_total_capacity = it->second.capacity; - - root_paths_stat->push_back(stat); + RootPathInfo info; + info.path = it->first; + info.is_used = it->second.is_used; + info.capacity = it->second.capacity; + root_paths_info->push_back(info); } _mutex.unlock(); - for (auto& stat : *root_paths_stat) { - if (stat.is_used) { - _get_disk_capacity(stat.root_path, &stat.disk_total_capacity, &stat.disk_available_capacity); - _get_root_path_capacity(stat.root_path, &stat.data_used_capacity); + for (auto& info: *root_paths_info) { + if (info.is_used) { + _get_root_path_capacity(info.path, &info.data_used_capacity, &info.available); } else { - stat.disk_total_capacity = 0; - stat.data_used_capacity = 0; - stat.disk_available_capacity = 0; + info.capacity = 1; + info.data_used_capacity = 0; + info.available = 0; } } @@ -340,6 +312,7 @@ OLAPStatus OLAPRootPath::reload_root_paths(const char* root_paths) { RootPathMap::iterator iter_root_path = _root_paths.find(root_path_vec[i]); if (iter_root_path == _root_paths.end()) { RootPathInfo root_path_info; + root_path_info.path = root_path_vec[i]; root_path_info.is_used = true; root_path_info.capacity = capacity_vec[i]; root_path_to_be_loaded.push_back(root_path_vec[i]); @@ -716,7 +689,8 @@ OLAPStatus OLAPRootPath::parse_root_paths_from_string( OLAPStatus OLAPRootPath::_get_root_path_capacity( const string& root_path, - int64_t* data_used) { + int64_t* data_used, + int64_t* disk_available) { OLAPStatus res = OLAP_SUCCESS; int64_t used = 0; @@ -729,28 +703,10 @@ OLAPStatus OLAPRootPath::_get_root_path_capacity( } } *data_used = used; - } catch (boost::filesystem::filesystem_error& e) { - OLAP_LOG_WARNING("get space info failed. [path: %s, erro:%s]", root_path.c_str(), e.what()); - return OLAP_ERR_STL_ERROR; - } - - return res; -} -OLAPStatus OLAPRootPath::_get_disk_capacity( - const string& root_path, - int64_t* capacity, - int64_t* available) { - OLAPStatus res = OLAP_SUCCESS; - - *capacity = 0; - *available = 0; - - try { boost::filesystem::path path_name(root_path); boost::filesystem::space_info path_info = boost::filesystem::space(path_name); - *capacity = path_info.capacity; - *available = path_info.available; + *disk_available = path_info.available; } catch (boost::filesystem::filesystem_error& e) { OLAP_LOG_WARNING("get space info failed. [path: %s, erro:%s]", root_path.c_str(), e.what()); return OLAP_ERR_STL_ERROR; @@ -1137,16 +1093,16 @@ OLAPStatus OLAPRootPath::_get_cluster_id_path_vec( vector* cluster_id_path_vec) { OLAPStatus res = OLAP_SUCCESS; - vector root_path_stat_vec; - res = get_all_root_path_stat(&root_path_stat_vec); + vector root_path_info_vec; + res = get_all_root_path_info(&root_path_info_vec); if (res != OLAP_SUCCESS) { - OLAP_LOG_WARNING("fail to get root path stat info. [res=%d]", res); + OLAP_LOG_WARNING("fail to get root path info. [res=%d]", res); return res; } - for (const auto& stat : root_path_stat_vec) { - if (stat.is_used) { - cluster_id_path_vec->push_back(stat.root_path + CLUSTER_ID_PREFIX); + for (const auto& info: root_path_info_vec) { + if (info.is_used) { + cluster_id_path_vec->push_back(info.path + CLUSTER_ID_PREFIX); } } diff --git a/be/src/olap/olap_rootpath.h b/be/src/olap/olap_rootpath.h index 7221a1fd4dc92d..222d10b3ae9159 100644 --- a/be/src/olap/olap_rootpath.h +++ b/be/src/olap/olap_rootpath.h @@ -32,20 +32,29 @@ namespace palo { -struct OLAPRootPathStat { - OLAPRootPathStat(): - disk_total_capacity(0), - data_used_capacity(0), - disk_available_capacity(0), - is_used(false) {} - - std::string root_path; - int64_t disk_total_capacity; +struct RootPathInfo { + RootPathInfo(): + capacity(1), + available(0), + data_used_capacity(0), + current_shard(0), + is_used(false), + to_be_deleted(false) {} + + std::string path; + std::string file_system; // 目录对应的磁盘分区 + std::string unused_flag_file; // 不可用标识对应的文件名 + int64_t capacity; // 总空间,单位字节 + int64_t available; // 可用空间,单位字节 int64_t data_used_capacity; - int64_t disk_available_capacity; - bool is_used; + uint64_t current_shard; // shard按0,1...方式编号,最大的shard号 + bool is_used; // 是否可用标识 + bool to_be_deleted; // 删除标识,如在reload时删除某一目录 + TStorageMedium::type storage_medium; // 存储介质类型:SSD|HDD + std::set table_set; }; + /* * 目前所谓的RootPath指的是storage_root_path,其目录组织结构如下: * @@ -84,8 +93,7 @@ class OLAPRootPath { void get_all_available_root_path(RootPathVec* all_available_root_path); // @brief 获取所有root_path信息 - OLAPStatus get_all_disk_stat(std::vector* disks_stat); - OLAPStatus get_all_root_path_stat(std::vector* root_paths_stat); + OLAPStatus get_all_root_path_info(std::vector* root_paths_info); // @brief 重新加载root_paths信息,全量操作。 // 对于新增的root_path,同init操作 @@ -142,25 +150,6 @@ class OLAPRootPath { std::atomic_bool is_report_olap_table_already; private: - struct RootPathInfo { - RootPathInfo(): - capacity(0), - available(0), - current_shard(0), - is_used(false), - to_be_deleted(false) {} - - std::string file_system; // 目录对应的磁盘分区 - std::string unused_flag_file; // 不可用标识对应的文件名 - int64_t capacity; // 总空间,单位字节 - int64_t available; // 可用空间,单位字节 - uint64_t current_shard; // shard按0,1...方式编号,最大的shard号 - bool is_used; // 是否可用标识 - bool to_be_deleted; // 删除标识,如在reload时删除某一目录 - TStorageMedium::type storage_medium; // 存储介质类型:SSD|HDD - std::set table_set; - }; - typedef std::map RootPathMap; // 检测磁盘。主要通过周期地读写4K的测试数据 @@ -177,12 +166,8 @@ class OLAPRootPath { OLAPStatus _get_root_path_capacity( const std::string& root_path, - int64_t* data_used); - - OLAPStatus _get_disk_capacity( - const std::string& root_path, - int64_t* capacity, - int64_t* available); + int64_t* data_used, + int64_t* disk_available); OLAPStatus _get_root_path_file_system(const std::string& root_path, std::string* file_system); diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc index bb24adc1d77fec..c39d1d8fe21d5c 100644 --- a/be/src/runtime/tmp_file_mgr.cc +++ b/be/src/runtime/tmp_file_mgr.cc @@ -30,6 +30,7 @@ // #include // #include +#include "olap/olap_rootpath.h" #include "util/debug_util.h" #include "util/disk_info.h" #include "util/filesystem_util.h" @@ -61,12 +62,9 @@ TmpFileMgr::TmpFileMgr() : Status TmpFileMgr::init(MetricRegistry* metrics) { std::string tmp_dirs_spec = config::storage_root_path; vector all_tmp_dirs; - // Empty string should be interpreted as no scratch - if (!tmp_dirs_spec.empty()) { - boost::split( - all_tmp_dirs, tmp_dirs_spec, - is_any_of(";"), boost::algorithm::token_compress_on); - } + + // we already paser the config::storage_root_path in OLAPRootPath, use it. + OLAPRootPath::get_instance()->get_all_available_root_path(&all_tmp_dirs); return init_custom(all_tmp_dirs, true, metrics); } diff --git a/be/test/agent/task_worker_pool_test.cpp b/be/test/agent/task_worker_pool_test.cpp index 386e28de1d62f0..a49ac265442586 100644 --- a/be/test/agent/task_worker_pool_test.cpp +++ b/be/test/agent/task_worker_pool_test.cpp @@ -1323,7 +1323,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) { // Get root path failed, report failed #if 0 - EXPECT_CALL(mock_command_executor, get_all_root_path_stat(_)) + EXPECT_CALL(mock_command_executor, get_all_root_path_info(_)) .Times(1) .WillOnce(Return(OLAPStatus::OLAP_ERR_OTHER_ERROR)); EXPECT_CALL(mock_master_server_client, report(_, _)) @@ -1332,7 +1332,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) { #endif // Get root path success, report failed - EXPECT_CALL(mock_command_executor, get_all_root_path_stat(_)) + EXPECT_CALL(mock_command_executor, get_all_root_path_info(_)) .Times(1) .WillOnce(Return(OLAPStatus::OLAP_SUCCESS)); EXPECT_CALL(mock_master_server_client, report(_, _)) @@ -1341,7 +1341,7 @@ TEST(TaskWorkerPoolTest, TestReportDiskState) { task_worker_pool._report_disk_state_worker_thread_callback(&task_worker_pool); // Get root path success, report success - EXPECT_CALL(mock_command_executor, get_all_root_path_stat(_)) + EXPECT_CALL(mock_command_executor, get_all_root_path_info(_)) .Times(1) .WillOnce(Return(OLAPStatus::OLAP_SUCCESS)); EXPECT_CALL(mock_master_server_client, report(_, _)) diff --git a/be/test/olap/command_executor_test.cpp b/be/test/olap/command_executor_test.cpp index 3ef0b4ef82dc94..69c19b7ab2ba37 100644 --- a/be/test/olap/command_executor_test.cpp +++ b/be/test/olap/command_executor_test.cpp @@ -622,10 +622,10 @@ TEST_F(TestReloadRootPath, reload_root_path) { ASSERT_EQ(OLAP_SUCCESS, remove_all_dir(root_path)); } -class TestGetRootPathStat : public ::testing::Test { +class TestGetRootPathInfo : public ::testing::Test { public: - TestGetRootPathStat() : _command_executor(NULL) {} - ~TestGetRootPathStat() { + TestGetRootPathInfo() : _command_executor(NULL) {} + ~TestGetRootPathInfo() { SAFE_DELETE(_command_executor); } @@ -652,14 +652,14 @@ class TestGetRootPathStat : public ::testing::Test { CommandExecutor* _command_executor; }; -TEST_F(TestGetRootPathStat, get_all_root_path_stat) { +TEST_F(TestGetRootPathInfo, get_all_root_path_info) { OLAPStatus res = OLAP_SUCCESS; - std::vector root_paths_stat; + std::vector root_paths_info; - res = _command_executor->get_all_root_path_stat(&root_paths_stat); + res = _command_executor->get_all_root_path_info(&root_paths_info); ASSERT_EQ(OLAP_SUCCESS, res); - ASSERT_EQ(1, root_paths_stat.size()); - EXPECT_STREQ(config::storage_root_path.c_str(), root_paths_stat[0].root_path.c_str()); + ASSERT_EQ(1, root_paths_info.size()); + EXPECT_STREQ(config::storage_root_path.c_str(), root_paths_info[0].path.c_str()); } class TestPush : public ::testing::Test { diff --git a/be/test/olap/mock_command_executor.h b/be/test/olap/mock_command_executor.h index 8b403fa2e10008..ae6e90bf342712 100644 --- a/be/test/olap/mock_command_executor.h +++ b/be/test/olap/mock_command_executor.h @@ -84,8 +84,8 @@ class MockCommandExecutor : public CommandExecutor { MOCK_METHOD1(reload_root_path, OLAPStatus(const std::string& root_paths)); MOCK_METHOD2(check_table_exist, bool(TTabletId tablet_id, TSchemaHash schema_hash)); MOCK_METHOD1( - get_all_root_path_stat, - OLAPStatus(std::vector* root_paths_stat)); + get_all_root_path_info, + OLAPStatus(std::vector* root_paths_info)); }; } // namespace palo diff --git a/conf/be.conf b/conf/be.conf index 6d08b34d43062c..06fa472423aaa6 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -1,34 +1,34 @@ # INFO, WARNING, ERROR, FATAL sys_log_level = INFO -# thirft port for fe to be +# ports for admin, web, heartbeat service be_port = 9060 -# rpc port for be to be be_rpc_port = 9070 -# web server port webserver_port = 8040 -# heartbeat port heartbeat_service_port = 9050 +brpc_port = 8060 -# data storage path, seperate by ';' -# Use .HDD as suffix to represent a SATA data storage. .SSD to SSD data storage. -# Default is SATA -storage_root_path = /home/disk1/palo.HDD;/home/disk2/palo.SSD;/home/disk3/palo +# Choose one if there are more than one ip except loopback address. +# Note that there should at most one ip match this list. +# If no ip match this rule, will choose one randomly. +# use CIDR format, e.g. 10.10.10.0/24 +# Default value is empty. +# priority_networks = 10.10.10.0/24;192.168.0.0/16 + +# data root path, seperate by ';' +# you can specify the storage medium of each root path, HDD or SSD +# you can add capacity limit at the end of each root path, seperate by ',' +# eg: +# storage_root_path = /home/disk1/palo.HDD,50;/home/disk2/palo.SSD,1;/home/disk2/palo +# /home/disk1/palo.HDD, capacity limit is 50GB, HDD; +# /home/disk2/palo.SSD, capacity limit is 1GB, SSD; +# /home/disk2/palo, capacity limit is disk capacity, HDD(default) +storage_root_path = /home/disk1/palo;/home/disk2/palo # Advanced configurations # sys_log_dir = ${PALO_HOME}/log # sys_log_roll_mode = SIZE-MB-1024 # sys_log_roll_num = 10 - -# Set which packages can print debug log. set * for all # sys_log_verbose_modules = - -# Set to -1 to flush log as soon as possible. For debug only. # log_buffer_level = -1 - -# Choose one if there are more than one ip except loopback address. -# Note that there should at most one ip match this list. -# If no ip match this rule, will choose one randomly. -# use CIDR format, e.g. 10.10.10.0/24 -# Default value is empty. -# priority_networks = 10.10.10.0/24;192.168.0.0/16 +# palo_cgroups diff --git a/conf/fe.conf b/conf/fe.conf index 22cf838bd29977..782314ce80811b 100644 --- a/conf/fe.conf +++ b/conf/fe.conf @@ -4,14 +4,11 @@ ## see fe/src/com/baidu/palo/common/Config.java ##################################################################### -# set JAVA_HOME or set JAVA_HOME in env variables -# JAVA_HOME = - -# the output dir of log, stderr and stdout +# the output dir of stderr and stdout LOG_DIR = ${PALO_HOME}/log DATE = `date +%Y%m%d-%H%M%S` -JAVA_OPTS="-Xmx1024m -XX:+UseMembar -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=0 -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xloggc:$PALO_HOME/log/fe.gc.log.$DATE" +JAVA_OPTS="-Xmx2024m -XX:+UseMembar -XX:SurvivorRatio=8 -XX:MaxTenuringThreshold=7 -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=0 -XX:+CMSClassUnloadingEnabled -XX:-CMSParallelRemarkEnabled -XX:CMSInitiatingOccupancyFraction=80 -XX:SoftRefLRUPolicyMSPerMB=0 -Xloggc:$PALO_HOME/log/fe.gc.log.$DATE" ## ## the lowercase properties are read by main program. @@ -23,38 +20,29 @@ sys_log_level = INFO # store metadata, create it if it is not exist. meta_dir = ${PALO_HOME}/palo-meta -# web server port http_port = 8030 -# thrift rpc port rpc_port = 9020 -# mysql port query_port = 9030 -# edit log port edit_log_port = 9010 -# Advanced configurations - -# log will be split by SIZE -# sys_log_roll_mode = SIZE-MB-1024 +# Choose one if there are more than one ip except loopback address. +# Note that there should at most one ip match this list. +# If no ip match this rule, will choose one randomly. +# use CIDR format, e.g. 10.10.10.0/24 +# Default value is empty. +# priority_networks = 10.10.10.0/24;192.168.0.0/16 -# max num of log kept +# Advanced configurations +# sys_log_dir = ${PALO_HOME}/log +# sys_log_roll_mode = "SIZE-MB-1024" # sys_log_roll_num = 10 - -# set which packages can print debug log. set * for all # sys_log_verbose_modules = - # audit_log_dir = # audit_log_modules = {"slow_query", "query"}; # audit_log_roll_mode = "TIME-DAY" # audit_log_roll_num = 10 -# qe_slow_log_ms = 5000 +# meta_delay_toleration_second = 10 # qe_max_connection = 1024 # max_conn_per_user = 100 # qe_query_timeout_second = 300 - -# Choose one if there are more than one ip except loopback address. -# Note that there should at most one ip match this list. -# If no ip match this rule, will choose one randomly. -# use CIDR format, e.g. 10.10.10.0/24 -# Default value is empty. -# priority_networks = 10.10.10.0/24;192.168.0.0/16 +# qe_slow_log_ms = 5000 diff --git a/fe/src/com/baidu/palo/analysis/Analyzer.java b/fe/src/com/baidu/palo/analysis/Analyzer.java index 751117bbda4604..197aaaac921142 100644 --- a/fe/src/com/baidu/palo/analysis/Analyzer.java +++ b/fe/src/com/baidu/palo/analysis/Analyzer.java @@ -218,6 +218,9 @@ private static class GlobalState { // by its right-hand side table ref) public final Map ijClauseByConjunct = Maps.newHashMap(); + // TODO chenhao16, to save conjuncts, which children are constant + public final Map constantConjunct = Maps.newHashMap(); + // map from slot id to the analyzer/block in which it was registered public final Map blockBySlot = Maps.newHashMap(); @@ -703,8 +706,13 @@ public void registerSemiJoinedTid(TupleId tid, TableRef rhsRef) { * Register all conjuncts in a list of predicates as Where clause conjuncts. */ public void registerConjuncts(List l) throws AnalysisException { + registerConjuncts(l, null); + } + + // register all conjuncts and handle constant conjuncts with ids + public void registerConjuncts(List l, List ids) throws AnalysisException { for (Expr e : l) { - registerConjuncts(e, true); + registerConjuncts(e, true, ids); } } @@ -713,9 +721,25 @@ public void registerConjuncts(List l) throws AnalysisException { * is assumed to originate from a WHERE or ON clause. */ public void registerConjuncts(Expr e, boolean fromHavingClause) throws AnalysisException { + registerConjuncts(e, fromHavingClause, null); + } + + // Register all conjuncts and handle constant conjuncts with ids + public void registerConjuncts(Expr e, boolean fromHavingClause, List ids) throws AnalysisException { for (Expr conjunct: e.getConjuncts()) { registerConjunct(conjunct); - markConstantConjunct(conjunct, fromHavingClause); + if (ids != null) { + for (TupleId id : ids) { + registerConstantConjunct(id, conjunct); + } + } + //markConstantConjunct(conjunct, fromHavingClause); + } + } + + private void registerConstantConjunct(TupleId id, Expr e) { + if (id != null && e.isConstant()) { + globalState.constantConjunct.put(id, e); } } @@ -838,6 +862,19 @@ public List getUnassignedConjuncts( List tupleIds, boolean inclOjConjuncts) { List result = Lists.newArrayList(); for (Expr e: globalState.conjuncts.values()) { + // handle constant conjuncts + if (e.isConstant()) { + boolean isBoundByTuple = false; + for (TupleId id : tupleIds) { + if (globalState.constantConjunct.containsKey(id)) { + isBoundByTuple = true; + break; + } + } + if (!isBoundByTuple) { + continue; + } + } if (e.isBoundByTupleIds(tupleIds) && !e.isAuxExpr() && !globalState.assignedConjuncts.contains(e.getId()) diff --git a/fe/src/com/baidu/palo/analysis/SelectStmt.java b/fe/src/com/baidu/palo/analysis/SelectStmt.java index ec1c30c4e098fb..927dc997cd119d 100644 --- a/fe/src/com/baidu/palo/analysis/SelectStmt.java +++ b/fe/src/com/baidu/palo/analysis/SelectStmt.java @@ -336,7 +336,7 @@ public void analyze(Analyzer analyzer) throws AnalysisException, InternalExcepti throw new AnalysisException( "WHERE clause must not contain analytic expressions: " + e.toSql()); } - analyzer.registerConjuncts(whereClause, false); + analyzer.registerConjuncts(whereClause, false, getTableRefIds()); } createSortInfo(analyzer); @@ -881,7 +881,7 @@ private void analyzeAggregation(Analyzer analyzer) throws AnalysisException { Preconditions.checkState(!havingPred.contains( Predicates.instanceOf(Subquery.class))); havingPred = havingPred.substitute(combinedSmap, analyzer, false); - analyzer.registerConjuncts(havingPred, true); + analyzer.registerConjuncts(havingPred, true, finalAggInfo.getOutputTupleId().asList()); if (LOG.isDebugEnabled()) { LOG.debug("post-agg havingPred: " + havingPred.debugString()); } diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index 585f3bddb5aa47..001cdca3a1e172 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -272,7 +272,7 @@ public class Catalog { private static Catalog CHECKPOINT = null; private static long checkpointThreadId = -1; private Checkpoint checkpointer; - private Pair helperNode = null; + private List> helperNodes = Lists.newArrayList(); private Pair selfNode = null; private Pair selfHostname = null; @@ -476,7 +476,7 @@ private void writeUnlock() { public void initialize(String[] args) throws Exception { // 0. get local node and helper node info getSelfHostPort(); - getHelperNode(args); + getHelperNodes(args); // 1. check and create dirs and files File meta = new File(metaDir); @@ -607,8 +607,9 @@ private void getClusterIdAndRole() throws IOException { // try to get role and node name from helper node, // this loop will not end until we get certain role type and name while(true) { - if (!getFeNodeTypeAndNameFromHelper()) { - LOG.warn("current node is not added to the group. please add it first."); + if (!getFeNodeTypeAndNameFromHelpers()) { + LOG.warn("current node is not added to the group. please add it first. " + + "sleep 5 seconds and retry, current helper nodes: {}", helperNodes); try { Thread.sleep(5000); continue; @@ -617,6 +618,7 @@ private void getClusterIdAndRole() throws IOException { System.exit(-1); } } + if (role == FrontendNodeType.REPLICA) { // for compatibility role = FrontendNodeType.FOLLOWER; @@ -624,9 +626,12 @@ private void getClusterIdAndRole() throws IOException { break; } + Preconditions.checkState(helperNodes.size() == 1); Preconditions.checkNotNull(role); Preconditions.checkNotNull(nodeName); + Pair rightHelperNode = helperNodes.get(0); + Storage storage = new Storage(IMAGE_DIR); if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName())) || !roleFile.exists()) { @@ -634,8 +639,8 @@ private void getClusterIdAndRole() throws IOException { } if (!versionFile.exists()) { // If the version file doesn't exist, download it from helper node - if (!getVersionFileFromHelper()) { - LOG.error("fail to download version file from " + helperNode.first + " will exit."); + if (!getVersionFileFromHelper(rightHelperNode)) { + LOG.error("fail to download version file from " + rightHelperNode.first + " will exit."); System.exit(-1); } @@ -653,7 +658,7 @@ private void getClusterIdAndRole() throws IOException { clusterId = storage.getClusterID(); token = storage.getToken(); try { - URL idURL = new URL("http://" + helperNode.first + ":" + Config.http_port + "/check"); + URL idURL = new URL("http://" + rightHelperNode.first + ":" + Config.http_port + "/check"); HttpURLConnection conn = null; conn = (HttpURLConnection) idURL.openConnection(); conn.setConnectTimeout(2 * 1000); @@ -661,7 +666,7 @@ private void getClusterIdAndRole() throws IOException { String clusterIdString = conn.getHeaderField(MetaBaseAction.CLUSTER_ID); int remoteClusterId = Integer.parseInt(clusterIdString); if (remoteClusterId != clusterId) { - LOG.error("cluster id is not equal with helper node {}. will exit.", helperNode.first); + LOG.error("cluster id is not equal with helper node {}. will exit.", rightHelperNode.first); System.exit(-1); } String remoteToken = conn.getHeaderField(MetaBaseAction.TOKEN); @@ -675,7 +680,7 @@ private void getClusterIdAndRole() throws IOException { Preconditions.checkNotNull(token); Preconditions.checkNotNull(remoteToken); if (!token.equals(remoteToken)) { - LOG.error("token is not equal with helper node {}. will exit.", helperNode.first); + LOG.error("token is not equal with helper node {}. will exit.", rightHelperNode.first); System.exit(-1); } } @@ -685,7 +690,7 @@ private void getClusterIdAndRole() throws IOException { } } - getNewImage(); + getNewImage(rightHelperNode); } if (Config.cluster_id != -1 && clusterId != Config.cluster_id) { @@ -699,6 +704,7 @@ private void getClusterIdAndRole() throws IOException { isElectable = false; } + Preconditions.checkState(helperNodes.size() == 1); LOG.info("finished to get cluster id: {}, role: {} and node name: {}", clusterId, role.name(), nodeName); } @@ -714,34 +720,52 @@ public static String genFeNodeName(String host, int port, boolean isOldStyle) { // Get the role info and node name from helper node. // return false if failed. - private boolean getFeNodeTypeAndNameFromHelper() { - try { - URL url = new URL("http://" + helperNode.first + ":" + Config.http_port - + "/role?host=" + selfNode.first + "&port=" + selfNode.second); - HttpURLConnection conn = null; - conn = (HttpURLConnection) url.openConnection(); - String type = conn.getHeaderField("role"); - if (type == null) { - LOG.warn("failed to get fe node type from helper node: {}.",helperNode); - return false; - } - role = FrontendNodeType.valueOf(type); - if (role == FrontendNodeType.UNKNOWN) { - LOG.warn("frontend {} is not added to cluster yet. role UNKNOWN", selfNode); - return false; - } - - nodeName = conn.getHeaderField("name"); - if (Strings.isNullOrEmpty(nodeName)) { - // For forward compatibility, we use old-style name: "ip_port" - nodeName = genFeNodeName(selfNode.first, selfNode.second, true /* old style */); + private boolean getFeNodeTypeAndNameFromHelpers() { + // we try to get info from helper nodes, once we get the right helper node, + // other helper nodes will be ignored and removed. + Pair rightHelperNode = null; + for (Pair helperNode : helperNodes) { + try { + URL url = new URL("http://" + helperNode.first + ":" + Config.http_port + + "/role?host=" + selfNode.first + "&port=" + selfNode.second); + HttpURLConnection conn = null; + conn = (HttpURLConnection) url.openConnection(); + String type = conn.getHeaderField("role"); + if (type == null) { + LOG.warn("failed to get fe node type from helper node: {}.", helperNode); + continue; + } + role = FrontendNodeType.valueOf(type); + nodeName = conn.getHeaderField("name"); + + // get role and node name before checking them, because we want to throw any exception + // as early as we encounter. + + if (role == FrontendNodeType.UNKNOWN) { + LOG.warn("frontend {} is not added to cluster yet. role UNKNOWN", selfNode); + return false; + } + + if (Strings.isNullOrEmpty(nodeName)) { + // For forward compatibility, we use old-style name: "ip_port" + nodeName = genFeNodeName(selfNode.first, selfNode.second, true /* old style */); + } + } catch (Exception e) { + LOG.warn("failed to get fe node type from helper node: {}.", helperNode, e); + continue; } - } catch (Exception e) { - LOG.warn("failed to get fe node type from helper node: {}.", helperNode, e); + + LOG.info("get fe node type {}, name {} from {}:{}", role, nodeName, helperNode.first, Config.http_port); + rightHelperNode = helperNode; + break; + } + + if (rightHelperNode == null) { return false; } - LOG.info("get fe node type {}, name {} from {}:{}", role, nodeName, helperNode.first, Config.http_port); + helperNodes.clear(); + helperNodes.add(rightHelperNode); return true; } @@ -751,15 +775,15 @@ private void getSelfHostPort() { LOG.debug("get self node: {}, self hostname: {}", selfNode, selfHostname); } - private void getHelperNode(String[] args) throws AnalysisException { - String helper = null; + private void getHelperNodes(String[] args) throws AnalysisException { + String helpers = null; for (int i = 0; i < args.length; i++) { if (args[i].equalsIgnoreCase("-helper")) { if (i + 1 >= args.length) { - System.out.println("-helper need parameter host:port"); + System.out.println("-helper need parameter host:port,host:port"); System.exit(-1); } - helper = args[i + 1]; + helpers = args[i + 1]; break; } } @@ -779,15 +803,21 @@ private void getHelperNode(String[] args) throws AnalysisException { getHelperNodeFromDeployManager(); } else { - // If helper node is not designated, use local node as helper node. - if (helper != null) { - helperNode = SystemInfoService.validateHostAndPort(helper); + if (helpers != null) { + String[] splittedHelpers = helpers.split(","); + for (String helper : splittedHelpers) { + helperNodes.add(SystemInfoService.validateHostAndPort(helper)); + } } else { - helperNode = new Pair(selfNode.first, Config.edit_log_port); + // If helper node is not designated, use local node as helper node. + helperNodes.add(Pair.create(selfNode.first, Config.edit_log_port)); } } + + LOG.info("get helper nodes: {}", helperNodes); } + @SuppressWarnings("unchecked") private void getHelperNodeFromDeployManager() { Preconditions.checkNotNull(deployManager); @@ -805,14 +835,14 @@ private void getHelperNodeFromDeployManager() { // This is not the first time this node start up. // It should already added to FE group, just set helper node as it self. LOG.info("role file exist. this is not the first time to start up"); - helperNode = new Pair(selfNode.first, Config.edit_log_port); + helperNodes = Lists.newArrayList(Pair.create(selfNode.first, Config.edit_log_port)); return; } // This is the first time this node start up. // Get helper node from deploy manager. - helperNode = deployManager.getHelperNode(); - if (helperNode == null) { + helperNodes = deployManager.getHelperNodes(); + if (helperNodes == null || helperNodes.isEmpty()) { LOG.error("failed to get helper node from deploy manager. exit"); System.exit(-1); } @@ -987,7 +1017,7 @@ private void checkCurrentNodeExist() { } } - private boolean getVersionFileFromHelper() throws IOException { + private boolean getVersionFileFromHelper(Pair helperNode) throws IOException { try { String url = "http://" + helperNode.first + ":" + Config.http_port + "/version"; File dir = new File(IMAGE_DIR); @@ -1002,7 +1032,7 @@ private boolean getVersionFileFromHelper() throws IOException { return false; } - private void getNewImage() throws IOException { + private void getNewImage(Pair helperNode) throws IOException { long localImageVersion = 0; Storage storage = new Storage(IMAGE_DIR); localImageVersion = storage.getImageSeq(); @@ -1024,12 +1054,23 @@ private void getNewImage() throws IOException { } } - public boolean isMyself() { + private boolean isMyself() { Preconditions.checkNotNull(selfNode); - Preconditions.checkNotNull(helperNode); - LOG.debug("self ip-port: {}:{}. helper ip-port: {}:{}", selfNode.first, selfNode.second, helperNode.first, - helperNode.second); - return selfNode.equals(helperNode); + Preconditions.checkNotNull(helperNodes); + LOG.debug("self: {}. helpers: {}", selfNode, helperNodes); + // if helper nodes contain it self, remove other helpers + boolean containSelf = false; + for (Pair helperNode : helperNodes) { + if (selfNode.equals(helperNode)) { + containSelf = true; + } + } + if (containSelf) { + helperNodes.clear(); + helperNodes.add(selfNode); + } + + return containSelf; } private StorageInfo getStorageInfo(URL url) throws IOException { @@ -4109,7 +4150,8 @@ public FrontendNodeType getRole() { } public Pair getHelperNode() { - return this.helperNode; + Preconditions.checkState(helperNodes.size() == 1); + return this.helperNodes.get(0); } public Pair getSelfNode() { diff --git a/fe/src/com/baidu/palo/catalog/Table.java b/fe/src/com/baidu/palo/catalog/Table.java index 7c2c29eec14f6b..4809ba0c3aba85 100644 --- a/fe/src/com/baidu/palo/catalog/Table.java +++ b/fe/src/com/baidu/palo/catalog/Table.java @@ -18,225 +18,224 @@ // specific language governing permissions and limitations // under the License. -package com.baidu.palo.catalog; - -import com.baidu.palo.analysis.CreateTableStmt; -import com.baidu.palo.analysis.TableName; -import com.baidu.palo.common.InternalException; -import com.baidu.palo.common.io.Text; -import com.baidu.palo.common.io.Writable; -import com.baidu.palo.thrift.TTableDescriptor; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -import org.apache.commons.lang.NotImplementedException; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * Internal representation of table-related metadata. A table contains several partitions. - */ -public class Table extends MetaObject implements Writable { - public enum TableType { - MYSQL, - OLAP, - SCHEMA, - INLINE_VIEW, - VIEW, - KUDU, - BROKER - } - - protected long id; - protected String name; - protected TableType type; - protected List baseSchema; - // tree map for case-insensitive lookup - protected Map nameToColumn; - - public Table(TableType type) { - this.type = type; - this.baseSchema = new LinkedList(); - this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - } - - public Table(long id, String tableName, TableType type, List baseSchema) { - this.id = id; - this.name = tableName; - this.type = type; - this.baseSchema = baseSchema; - - this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - if (baseSchema != null) { - for (Column col : baseSchema) { - nameToColumn.put(col.getName(), col); - } - } else { - // Only view in with-clause have null base - Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns"); - } - } - - public long getId() { - return id; - } - - public String getName() { - return name; - } - - public TableType getType() { - return type; - } - - public int getKeysNum() { - int keysNum = 0; - for (Column column : baseSchema) { - if (column.isKey()) { - keysNum += 1; - } - } - return keysNum; - } - - public List getBaseSchema() { - return baseSchema; - } - - public void setNewBaseSchema(List newSchema) { - this.baseSchema = newSchema; - this.nameToColumn.clear(); - for (Column col : baseSchema) { - nameToColumn.put(col.getName(), col); - } - } - - public Column getColumn(String name) { - return nameToColumn.get(name); - } - - public TTableDescriptor toThrift() { - return null; - } - - public static Table read(DataInput in) throws IOException { - Table table = null; - TableType type = TableType.valueOf(Text.readString(in)); - if (type == TableType.OLAP) { - table = new OlapTable(); - table.readFields(in); - } else if (type == TableType.MYSQL) { - table = new MysqlTable(); - table.readFields(in); - } else if (type == TableType.VIEW) { - View view = new View(); - view.readFields(in); - try { - view.init(); - } catch (InternalException e) { - throw new IOException(e.getMessage()); - } - table = view; - } else if (type == TableType.KUDU) { - table = new KuduTable(); - table.readFields(in); - } else if (type == TableType.BROKER) { - table = new BrokerTable(); - table.readFields(in); - } else { - throw new IOException("Unknown table type: " + type.name()); - } - - return table; - } - - @Override - public void write(DataOutput out) throws IOException { - // ATTN: must write type first - Text.writeString(out, type.name()); - - // write last check time - super.write(out); - - out.writeLong(id); - Text.writeString(out, name); - - // base schema - int columnCount = baseSchema.size(); - out.writeInt(columnCount); - for (Column column : baseSchema) { - column.write(out); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - - this.id = in.readLong(); - this.name = Text.readString(in); - - // base schema - int columnCount = in.readInt(); - for (int i = 0; i < columnCount; i++) { - Column column = Column.read(in); - this.baseSchema.add(column); - this.nameToColumn.put(column.getName(), column); - } - } - - public boolean equals(Table table) { - return true; - } - - // return if this table is partitioned. - // For OlapTable ture when is partitioned, or distributed by hash when no partition - public boolean isPartitioned() { - return false; - } - - public Partition getPartition(String partitionName) { - return null; - } - - public String getEngine() { - if (this instanceof OlapTable) { - return "Palo"; - } else if (this instanceof MysqlTable) { - return "MySQL"; - } else if (this instanceof SchemaTable) { - return "MEMORY"; - } else { - return null; - } - } - - public String getMysqlType() { - if (this instanceof View) { - return "VIEW"; - } - return "BASE TABLE"; - } - - public String getComment() { - if (this instanceof View) { - return "VIEW"; - } - return ""; - } - - public CreateTableStmt toCreateTableStmt(String dbName) { - throw new NotImplementedException(); - } - - @Override - public int getSignature(int signatureVersion) { - throw new NotImplementedException(); - } -} +package com.baidu.palo.catalog; + +import com.baidu.palo.analysis.CreateTableStmt; +import com.baidu.palo.common.InternalException; +import com.baidu.palo.common.io.Text; +import com.baidu.palo.common.io.Writable; +import com.baidu.palo.thrift.TTableDescriptor; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +import org.apache.commons.lang.NotImplementedException; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Internal representation of table-related metadata. A table contains several partitions. + */ +public class Table extends MetaObject implements Writable { + public enum TableType { + MYSQL, + OLAP, + SCHEMA, + INLINE_VIEW, + VIEW, + KUDU, + BROKER + } + + protected long id; + protected String name; + protected TableType type; + protected List baseSchema; + // tree map for case-insensitive lookup + protected Map nameToColumn; + + public Table(TableType type) { + this.type = type; + this.baseSchema = new LinkedList(); + this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + } + + public Table(long id, String tableName, TableType type, List baseSchema) { + this.id = id; + this.name = tableName; + this.type = type; + this.baseSchema = baseSchema; + + this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + if (baseSchema != null) { + for (Column col : baseSchema) { + nameToColumn.put(col.getName(), col); + } + } else { + // Only view in with-clause have null base + Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns"); + } + } + + public long getId() { + return id; + } + + public String getName() { + return name; + } + + public TableType getType() { + return type; + } + + public int getKeysNum() { + int keysNum = 0; + for (Column column : baseSchema) { + if (column.isKey()) { + keysNum += 1; + } + } + return keysNum; + } + + public List getBaseSchema() { + return baseSchema; + } + + public void setNewBaseSchema(List newSchema) { + this.baseSchema = newSchema; + this.nameToColumn.clear(); + for (Column col : baseSchema) { + nameToColumn.put(col.getName(), col); + } + } + + public Column getColumn(String name) { + return nameToColumn.get(name); + } + + public TTableDescriptor toThrift() { + return null; + } + + public static Table read(DataInput in) throws IOException { + Table table = null; + TableType type = TableType.valueOf(Text.readString(in)); + if (type == TableType.OLAP) { + table = new OlapTable(); + table.readFields(in); + } else if (type == TableType.MYSQL) { + table = new MysqlTable(); + table.readFields(in); + } else if (type == TableType.VIEW) { + View view = new View(); + view.readFields(in); + try { + view.init(); + } catch (InternalException e) { + throw new IOException(e.getMessage()); + } + table = view; + } else if (type == TableType.KUDU) { + table = new KuduTable(); + table.readFields(in); + } else if (type == TableType.BROKER) { + table = new BrokerTable(); + table.readFields(in); + } else { + throw new IOException("Unknown table type: " + type.name()); + } + + return table; + } + + @Override + public void write(DataOutput out) throws IOException { + // ATTN: must write type first + Text.writeString(out, type.name()); + + // write last check time + super.write(out); + + out.writeLong(id); + Text.writeString(out, name); + + // base schema + int columnCount = baseSchema.size(); + out.writeInt(columnCount); + for (Column column : baseSchema) { + column.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + + this.id = in.readLong(); + this.name = Text.readString(in); + + // base schema + int columnCount = in.readInt(); + for (int i = 0; i < columnCount; i++) { + Column column = Column.read(in); + this.baseSchema.add(column); + this.nameToColumn.put(column.getName(), column); + } + } + + public boolean equals(Table table) { + return true; + } + + // return if this table is partitioned. + // For OlapTable ture when is partitioned, or distributed by hash when no partition + public boolean isPartitioned() { + return false; + } + + public Partition getPartition(String partitionName) { + return null; + } + + public String getEngine() { + if (this instanceof OlapTable) { + return "Palo"; + } else if (this instanceof MysqlTable) { + return "MySQL"; + } else if (this instanceof SchemaTable) { + return "MEMORY"; + } else { + return null; + } + } + + public String getMysqlType() { + if (this instanceof View) { + return "VIEW"; + } + return "BASE TABLE"; + } + + public String getComment() { + if (this instanceof View) { + return "VIEW"; + } + return ""; + } + + public CreateTableStmt toCreateTableStmt(String dbName) { + throw new NotImplementedException(); + } + + @Override + public int getSignature(int signatureVersion) { + throw new NotImplementedException(); + } +} diff --git a/fe/src/com/baidu/palo/clone/ClusterLoadStatistic.java b/fe/src/com/baidu/palo/clone/ClusterLoadStatistic.java index cc318b13daa9aa..ab878c388dc8c6 100644 --- a/fe/src/com/baidu/palo/clone/ClusterLoadStatistic.java +++ b/fe/src/com/baidu/palo/clone/ClusterLoadStatistic.java @@ -133,6 +133,15 @@ public synchronized List> getBackendStatistic(long beId) { return statistics; } + public synchronized BackendLoadStatistic getBackendLoadStatistic(long beId) { + for (BackendLoadStatistic backendLoadStatistic : beLoadStatistics) { + if (backendLoadStatistic.getBeId() == beId) { + return backendLoadStatistic; + } + } + return null; + } + /* * Try to choose 2 backend root paths as source and destination for the specified tablet to recovery. */ diff --git a/fe/src/com/baidu/palo/clone/LoadBalancer.java b/fe/src/com/baidu/palo/clone/LoadBalancer.java index c34fb3bed764c9..6cd75b94f30544 100644 --- a/fe/src/com/baidu/palo/clone/LoadBalancer.java +++ b/fe/src/com/baidu/palo/clone/LoadBalancer.java @@ -17,10 +17,13 @@ import com.baidu.palo.catalog.Catalog; import com.baidu.palo.common.util.Daemon; +import com.baidu.palo.system.SystemInfoService; + +import java.util.List; public class LoadBalancer extends Daemon { - ClusterLoadStatistic clusterLoadStatistic; + private ClusterLoadStatistic clusterLoadStatistic; public LoadBalancer() { super("load balancer", 60 * 1000); @@ -31,6 +34,18 @@ public LoadBalancer() { @Override protected void runOneCycle() { + clusterLoadStatistic.init(SystemInfoService.DEFAULT_CLUSTER); + } + + public BackendLoadStatistic getBackendStatistic(long beId) { + return clusterLoadStatistic.getBackendLoadStatistic(beId); + } + + public List> getClusterStatisticInfo() { + return clusterLoadStatistic.getCLusterStatistic(); + } + public List> getBackendStatisticInfo(long beId) { + return clusterLoadStatistic.getBackendStatistic(beId); } } diff --git a/fe/src/com/baidu/palo/common/Config.java b/fe/src/com/baidu/palo/common/Config.java index 8592e250df7baa..e7a629dfb58b7d 100644 --- a/fe/src/com/baidu/palo/common/Config.java +++ b/fe/src/com/baidu/palo/common/Config.java @@ -491,7 +491,7 @@ public class Config extends ConfigBase { * disable: no deploy manager * k8s: Kubernetes * ambari: Ambari - * local: Local File (test only) + * local: Local File (for test or Boxer2 BCC version) */ @ConfField public static String enable_deploy_manager = "disable"; diff --git a/fe/src/com/baidu/palo/common/proc/BackendsProcDir.java b/fe/src/com/baidu/palo/common/proc/BackendsProcDir.java index f6af9b5809997c..d2e79e6d24bf79 100644 --- a/fe/src/com/baidu/palo/common/proc/BackendsProcDir.java +++ b/fe/src/com/baidu/palo/common/proc/BackendsProcDir.java @@ -32,18 +32,25 @@ import com.baidu.palo.system.SystemInfoService; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; public class BackendsProcDir implements ProcDirInterface { + private static final Logger LOG = LogManager.getLogger(BackendsProcDir.class); + public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("BackendId").add("Cluster").add("IP").add("HostName").add("HeartbeatPort") .add("BePort").add("HttpPort").add("brpcPort").add("LastStartTime").add("LastHeartbeat").add("Alive") @@ -108,6 +115,9 @@ public static List> getClusterBackendInfos(String clusterName) { return backendInfos; } } + + long start = System.currentTimeMillis(); + Stopwatch watch = Stopwatch.createUnstarted(); List> comparableBackendInfos = new LinkedList>(); for (long backendId : backendIds) { Backend backend = clusterInfoService.getBackend(backendId); @@ -115,7 +125,6 @@ public static List> getClusterBackendInfos(String clusterName) { continue; } - String ip = "N/A"; String hostName = "N/A"; try { InetAddress address = InetAddress.getByName(backend.getHost()); @@ -124,7 +133,9 @@ public static List> getClusterBackendInfos(String clusterName) { continue; } + watch.start(); Integer tabletNum = Catalog.getCurrentInvertedIndex().getTabletNumByBackendId(backendId); + watch.stop(); List backendInfo = Lists.newArrayList(); backendInfo.add(String.valueOf(backendId)); backendInfo.add(backend.getOwnerClusterName()); @@ -163,12 +174,15 @@ public static List> getClusterBackendInfos(String clusterName) { if (backend.getTotalCapacityB() <= 0) { free = 0.0; } else { - free = (double) backend.getAvailableCapacityB() * 100 / backend.getTotalCapacityB(); + free = (double) backend.getDataUsedCapacityB() * 100 / backend.getTotalCapacityB(); } backendInfo.add(String.format("%.2f", free) + " %"); comparableBackendInfos.add(backendInfo); } + // backends proc node get result too slow, add log to observer. + LOG.info("backends proc get tablet num cost: {}, total cost: {}", + watch.elapsed(TimeUnit.MILLISECONDS), (System.currentTimeMillis() - start)); // sort by cluster name, host name ListComparator> comparator = new ListComparator>(1, 3); diff --git a/fe/src/com/baidu/palo/common/proc/ClusterLoadStatisticProcDir.java b/fe/src/com/baidu/palo/common/proc/ClusterLoadStatisticProcDir.java index 564860511be4bc..aa5eae343d06bd 100644 --- a/fe/src/com/baidu/palo/common/proc/ClusterLoadStatisticProcDir.java +++ b/fe/src/com/baidu/palo/common/proc/ClusterLoadStatisticProcDir.java @@ -38,8 +38,8 @@ public ProcResult fetchResult() throws AnalysisException { result.setNames(TITLE_NAMES); statistic = new ClusterLoadStatistic(Catalog.getCurrentCatalog(), - Catalog.getCurrentSystemInfo(), - Catalog.getCurrentInvertedIndex()); + Catalog.getCurrentSystemInfo(), + Catalog.getCurrentInvertedIndex()); statistic.init(SystemInfoService.DEFAULT_CLUSTER); List> statistics = statistic.getCLusterStatistic(); result.setRows(statistics); diff --git a/fe/src/com/baidu/palo/common/proc/StatisticProcDir.java b/fe/src/com/baidu/palo/common/proc/StatisticProcDir.java index e10b24d8ccfe5c..dd092420d2b6a5 100644 --- a/fe/src/com/baidu/palo/common/proc/StatisticProcDir.java +++ b/fe/src/com/baidu/palo/common/proc/StatisticProcDir.java @@ -20,28 +20,29 @@ package com.baidu.palo.common.proc; -import com.baidu.palo.catalog.Catalog; -import com.baidu.palo.catalog.Database; -import com.baidu.palo.catalog.MaterializedIndex; -import com.baidu.palo.catalog.OlapTable; -import com.baidu.palo.catalog.Replica; -import com.baidu.palo.catalog.Replica.ReplicaState; -import com.baidu.palo.catalog.Tablet; -import com.baidu.palo.catalog.Partition; -import com.baidu.palo.catalog.Table; -import com.baidu.palo.catalog.Table.TableType; -import com.baidu.palo.common.AnalysisException; -import com.baidu.palo.common.util.ListComparator; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.catalog.Database; +import com.baidu.palo.catalog.MaterializedIndex; +import com.baidu.palo.catalog.OlapTable; +import com.baidu.palo.catalog.Partition; +import com.baidu.palo.catalog.Replica; +import com.baidu.palo.catalog.Replica.ReplicaState; +import com.baidu.palo.catalog.Table; +import com.baidu.palo.catalog.Table.TableType; +import com.baidu.palo.catalog.Tablet; +import com.baidu.palo.common.AnalysisException; +import com.baidu.palo.common.util.ListComparator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Set; public class StatisticProcDir implements ProcDirInterface { @@ -218,5 +219,97 @@ public ProcNodeInterface lookup(String dbIdStr) throws AnalysisException { } return new IncompleteTabletsProcNode(incompleteTabletIds.get(dbId), inconsistentTabletIds.get(dbId)); + } + + // used to metrics + public static List getStatistic() { + int totalDbNum = 0; + int totalTableNum = 0; + int totalPartitionNum = 0; + int totalIndexNum = 0; + int totalTabletNum = 0; + int totalReplicaNum = 0; + int totalIncompleteTabletNum = 0; + int totalInconsistentTabletNum = 0; + + List result = Lists.newArrayList(0, 0, 0, 0, 0, 0, 0, 0); + + List dbIds = Catalog.getInstance().getDbIds(); + if (dbIds == null || dbIds.isEmpty()) { + // empty + return result; + } + + // get alive backends + Set aliveBackendIds = Sets.newHashSet(Catalog.getCurrentSystemInfo().getBackendIds(true)); + for (Long dbId : dbIds) { + if (dbId == 0) { + // skip information_schema database + continue; + } + Database db = Catalog.getInstance().getDb(dbId); + if (db == null) { + continue; + } + + ++totalDbNum; + db.readLock(); + try { + for (Table table : db.getTables()) { + if (table.getType() != TableType.OLAP) { + continue; + } + + ++totalTableNum; + OlapTable olapTable = (OlapTable) table; + + for (Partition partition : olapTable.getPartitions()) { + short replicationNum = olapTable.getPartitionInfo().getReplicationNum(partition.getId()); + ++totalPartitionNum; + for (MaterializedIndex materializedIndex : partition.getMaterializedIndices()) { + ++totalIndexNum; + for (Tablet tablet : materializedIndex.getTablets()) { + int onlineReplicaNum = 0; + ++totalTabletNum; + List replicas = tablet.getReplicas(); + totalReplicaNum += replicas.size(); + + for (Replica replica : tablet.getReplicas()) { + ReplicaState state = replica.getState(); + if (state != ReplicaState.NORMAL && state != ReplicaState.SCHEMA_CHANGE) { + continue; + } + if (!aliveBackendIds.contains(replica.getBackendId())) { + continue; + } + ++onlineReplicaNum; + } + + if (onlineReplicaNum < replicationNum) { + ++totalIncompleteTabletNum; + } + + if (!tablet.isConsistent()) { + ++totalInconsistentTabletNum; + } + } // end for tablets + } // end for indices + } // end for partitions + } // end for tables + } finally { + db.readUnlock(); + } + } // end for dbs + + result.add(totalDbNum); + result.add(totalTableNum); + result.add(totalPartitionNum); + result.add(totalIndexNum); + result.add(totalTabletNum); + result.add(totalReplicaNum); + result.add(totalIncompleteTabletNum); + result.add(totalInconsistentTabletNum); + + return result; } } diff --git a/fe/src/com/baidu/palo/deploy/DeployManager.java b/fe/src/com/baidu/palo/deploy/DeployManager.java index ffe5d65aa7b232..732e2caebd7331 100644 --- a/fe/src/com/baidu/palo/deploy/DeployManager.java +++ b/fe/src/com/baidu/palo/deploy/DeployManager.java @@ -86,11 +86,17 @@ public class DeployManager extends Daemon { private static final Logger LOG = LogManager.getLogger(DeployManager.class); + // We misspelled the environment value ENV_FE_EXIST_ENT(D)POINT. But for forward compatibility, + // we have to keep this misspelling for a while. + // TODO(cmy): remove it later + @Deprecated public static final String ENV_FE_EXIST_ENTPOINT = "FE_EXIST_ENTPOINT"; + + public static final String ENV_FE_EXIST_ENDPOINT = "FE_EXIST_ENDPOINT"; public static final String ENV_FE_INIT_NUMBER = "FE_INIT_NUMBER"; public enum NodeType { - ELECTABLE, OBSERVER, BACKEND + ELECTABLE, OBSERVER, BACKEND, BROKER } protected Catalog catalog; @@ -195,25 +201,34 @@ protected Map>> getBrokerGroupHostPorts() { throw new NotImplementedException(); } - public Pair getHelperNode() { - final String existFeHosts = System.getenv(ENV_FE_EXIST_ENTPOINT); + public List> getHelperNodes() { + String existFeHosts = System.getenv(ENV_FE_EXIST_ENTPOINT); + if (Strings.isNullOrEmpty(existFeHosts)) { + existFeHosts = System.getenv(ENV_FE_EXIST_ENDPOINT); + } if (!Strings.isNullOrEmpty(existFeHosts)) { // Some Frontends already exist in service group. - // Arbitrarily choose the first one as helper node to start up + // We consider them as helper node + List> helperNodes = Lists.newArrayList(); String[] splittedHosts = existFeHosts.split(","); - String[] splittedHostPort = splittedHosts[0].split(":"); - if (splittedHostPort.length != 2) { - LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts); - System.exit(-1); - } - Integer port = -1; - try { - port = Integer.valueOf(splittedHostPort[1]); - } catch (NumberFormatException e) { - LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts); - System.exit(-1); + for (String host : splittedHosts) { + String[] splittedHostPort = host.split(":"); + if (splittedHostPort.length != 2) { + LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts); + System.exit(-1); + } + Integer port = -1; + try { + port = Integer.valueOf(splittedHostPort[1]); + } catch (NumberFormatException e) { + LOG.error("Invalid exist fe hosts: {}. will exit", existFeHosts); + System.exit(-1); + } + + helperNodes.add(Pair.create(splittedHostPort[0], port)); } - return Pair.create(splittedHostPort[0], port); + + return helperNodes; } // No Frontend exist before. @@ -280,7 +295,7 @@ public Pair getHelperNode() { LOG.info("sorted fe host list: {}", feHostPorts); // 4. return the first one as helper - return feHostPorts.get(0); + return feHostPorts.subList(0, 1); } @Override @@ -510,7 +525,7 @@ private boolean inspectNodeChange(List> remoteHosts, } if (true) { - // TODO(cmy): For now, Deploy Manager dose not handle shrinking operations + LOG.info("For now, Deploy Manager dose not handle shrinking operations"); continue; } diff --git a/fe/src/com/baidu/palo/deploy/impl/K8sDeployManager.java b/fe/src/com/baidu/palo/deploy/impl/K8sDeployManager.java index 189fe51f42cb45..e0fe7018fb0e48 100644 --- a/fe/src/com/baidu/palo/deploy/impl/K8sDeployManager.java +++ b/fe/src/com/baidu/palo/deploy/impl/K8sDeployManager.java @@ -28,25 +28,24 @@ public class K8sDeployManager extends DeployManager { private static final Logger LOG = LogManager.getLogger(K8sDeployManager.class); - public static final String ENV_FE_SERVICE_NAME = "FE_SERVICE_NAME"; - public static final String ENV_FE_NAMESPACE = "FE_NAMESPACE"; - public static final String ENV_FE_OBSERVER_SERVICE_NAME = "FE_OBSERVER_SERVICE_NAME"; - public static final String ENV_FE_OBSERVER_NAMESPACE = "FE_OBSERVER_NAMESPACE"; - public static final String ENV_BE_SERVICE_NAME = "BE_SERVICE_NAME"; - public static final String ENV_BE_NAMESPACE = "BE_NAMESPACE"; - public static final String ENV_BROKER_SERVICE_NAME = "BROKER_SERVICE_NAME"; - public static final String ENV_BROKER_NAMESPACE = "BROKER_NAMESPACE"; + public static final String ENV_APP_NAMESPACE = "APP_NAMESPACE"; + // each SERVICE (FE/BE/OBSERVER/BROKER) represents a module of Palo, such as Frontends, Backends, ... + // and each service has a name in k8s. + public static final String ENV_FE_SERVICE = "FE_SERVICE"; + public static final String ENV_FE_OBSERVER_SERVICE = "FE_OBSERVER_SERVICE"; + public static final String ENV_BE_SERVICE = "BE_SERVICE"; + public static final String ENV_BROKER_SERVICE = "BROKER_SERVICE"; + // we arbitrarily set all broker name as what ENV_BROKER_NAME specified. public static final String ENV_BROKER_NAME = "BROKER_NAME"; public static final String FE_PORT = "edit-log-port"; // k8s only support -, not _ public static final String BE_PORT = "heartbeat-port"; public static final String BROKER_PORT = "broker-port"; - private String feNamespace; - private String observerNamespace; - private String beNamespace; - private String brokerNamespace; + // corresponding to the environment variable ENV_APP_NAMESPACE. + // App represents a Palo cluster in K8s, and has a namespace, and default namespace is 'default' + private String appNamespace; private KubernetesClient client = null; // =======for test only========== @@ -61,8 +60,7 @@ public class K8sDeployManager extends DeployManager { public K8sDeployManager(Catalog catalog, long intervalMs) { super(catalog, intervalMs); - initEnvVariables(ENV_FE_SERVICE_NAME, ENV_FE_OBSERVER_SERVICE_NAME, ENV_BE_SERVICE_NAME, - ENV_BROKER_SERVICE_NAME); + initEnvVariables(ENV_FE_SERVICE, ENV_FE_OBSERVER_SERVICE, ENV_BE_SERVICE, ENV_BROKER_SERVICE); } @Override @@ -71,70 +69,48 @@ protected void initEnvVariables(String envElectableFeServiceGroup, String envObs super.initEnvVariables(envElectableFeServiceGroup, envObserverFeServiceGroup, envBackendServiceGroup, envBrokerServiceGroup); - // namespaces - feNamespace = Strings.nullToEmpty(System.getenv(ENV_FE_NAMESPACE)); - beNamespace = Strings.nullToEmpty(System.getenv(ENV_BE_NAMESPACE)); + // namespace + appNamespace = Strings.nullToEmpty(System.getenv(ENV_APP_NAMESPACE)); - // FE and BE namespace must exist - if (Strings.isNullOrEmpty(feNamespace) || Strings.isNullOrEmpty(beNamespace)) { - LOG.error("failed to init namespace. feNamespace: {}, beNamespace: {}", - feNamespace, observerNamespace, beNamespace); + if (Strings.isNullOrEmpty(appNamespace)) { + LOG.error("failed to init namespace: " + ENV_APP_NAMESPACE); System.exit(-1); } - // observer namespace - observerNamespace = Strings.nullToEmpty(System.getenv(ENV_FE_OBSERVER_NAMESPACE)); - if (Strings.isNullOrEmpty(observerNamespace)) { - LOG.warn("failed to init observer namespace."); - hasObserverService = false; - } - - brokerNamespace = Strings.nullToEmpty(System.getenv(ENV_BROKER_NAMESPACE)); - if (Strings.isNullOrEmpty(brokerNamespace)) { - LOG.warn("failed to init broker namespace."); - hasBrokerService = false; - } - - LOG.info("get namespace. feNamespace: {}, observerNamespace: {}, beNamespace: {}, brokerNamespace: {}", - feNamespace, observerNamespace, beNamespace, brokerNamespace); + LOG.info("get namespace: {}", appNamespace); } @Override protected List> getGroupHostPorts(String groupName) { // 1. get namespace and port name - String namespace = null; String portName = null; if (groupName.equals(electableFeServiceGroup)) { - namespace = feNamespace; portName = FE_PORT; } else if (groupName.equals(observerFeServiceGroup)) { - namespace = observerNamespace; portName = FE_PORT; } else if (groupName.equals(backendServiceGroup)) { - namespace = beNamespace; portName = BE_PORT; } else if (groupName.equals(brokerServiceGroup)) { - namespace = brokerNamespace; portName = BROKER_PORT; } else { LOG.warn("unknown service group name: {}", groupName); return null; } - Preconditions.checkNotNull(namespace); + Preconditions.checkNotNull(appNamespace); Preconditions.checkNotNull(portName); // 2. get endpoint Endpoints endpoints = null; try { - endpoints = endpoints(namespace, groupName); + endpoints = endpoints(appNamespace, groupName); } catch (Exception e) { LOG.warn("encounter exception when get endpoint from namespace {}, service: {}", - namespace, groupName, e); + appNamespace, groupName, e); return null; } if (endpoints == null) { // endpoints may be null if service does not exist; - LOG.warn("get null endpoints of namespace {} in service: {}", namespace, groupName); + LOG.warn("get null endpoints of namespace {} in service: {}", appNamespace, groupName); return null; } diff --git a/fe/src/com/baidu/palo/deploy/impl/LocalFileDeployManager.java b/fe/src/com/baidu/palo/deploy/impl/LocalFileDeployManager.java index 01edf145b6153f..7c85f9f5c20de3 100644 --- a/fe/src/com/baidu/palo/deploy/impl/LocalFileDeployManager.java +++ b/fe/src/com/baidu/palo/deploy/impl/LocalFileDeployManager.java @@ -1,78 +1,134 @@ package com.baidu.palo.deploy.impl; import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.common.AnalysisException; import com.baidu.palo.common.Pair; import com.baidu.palo.deploy.DeployManager; import com.baidu.palo.system.SystemInfoService; +import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.util.List; +import java.util.Map; /* - * This for test only. - * The LocalEnvDeployManager will watch the change of the Local environment variables - * 'ELECTABLE_FE_SERVICE_GROUP', 'OBSERVER_FE_SERVICE_GROUP' and 'BACKEND_SERVICE_GROUP' - * which are 3 local files: fe_host.list, ob_host.list, be_host.list - * - * eg: - * - * export ELECTABLE_FE_SERVICE_GROUP=fe_host.list - * export OBSERVER_FE_SERVICE_GROUP=ob_host.list - * export BACKEND_SERVICE_GROUP=be_host.list + * This for Boxer2 Baidu BCC agent + * It will watch the change of file cluster.info, which contains: + * FE=ip:port,ip:port,... + * BE=ip:port,ip:port,... + * BROKER=ip:port,ip:port,... */ public class LocalFileDeployManager extends DeployManager { private static final Logger LOG = LogManager.getLogger(LocalFileDeployManager.class); - public static final String ENV_ELECTABLE_FE_SERVICE_GROUP = "ELECTABLE_FE_SERVICE_GROUP"; - public static final String ENV_OBSERVER_FE_SERVICE_GROUP = "OBSERVER_FE_SERVICE_GROUP"; - public static final String ENV_BACKEND_SERVICE_GROUP = "BACKEND_SERVICE_GROUP"; + public static final String ENV_APP_NAMESPACE = "APP_NAMESPACE"; + public static final String ENV_FE_SERVICE = "FE_SERVICE"; + public static final String ENV_FE_OBSERVER_SERVICE = "FE_OBSERVER_SERVICE"; + public static final String ENV_BE_SERVICE = "BE_SERVICE"; + public static final String ENV_BROKER_SERVICE = "BROKER_SERVICE"; + + public static final String ENV_BROKER_NAME = "BROKER_NAME"; + + private String clusterInfoFile; public LocalFileDeployManager(Catalog catalog, long intervalMs) { super(catalog, intervalMs); - initEnvVariables(ENV_ELECTABLE_FE_SERVICE_GROUP, ENV_OBSERVER_FE_SERVICE_GROUP, ENV_BACKEND_SERVICE_GROUP, ""); + initEnvVariables(ENV_FE_SERVICE, ENV_FE_OBSERVER_SERVICE, ENV_BE_SERVICE, ENV_BROKER_SERVICE); + } + + @Override + protected void initEnvVariables(String envElectableFeServiceGroup, String envObserverFeServiceGroup, + String envBackendServiceGroup, String envBrokerServiceGroup) { + super.initEnvVariables(envElectableFeServiceGroup, envObserverFeServiceGroup, envBackendServiceGroup, + envBrokerServiceGroup); + + // namespace + clusterInfoFile = Strings.nullToEmpty(System.getenv(ENV_APP_NAMESPACE)); + + if (Strings.isNullOrEmpty(clusterInfoFile)) { + LOG.error("failed get cluster info file name: " + ENV_APP_NAMESPACE); + System.exit(-1); + } + + LOG.info("get cluster info file name: {}", clusterInfoFile); } @Override public List> getGroupHostPorts(String groupName) { List> result = Lists.newArrayList(); - LOG.debug("begin to get group: {}", groupName); - File file = new File(groupName); - if (!file.exists()) { - LOG.warn("failed to get file from "); - return null; - } + LOG.debug("begin to get group: {} from file: {}", groupName, clusterInfoFile); - BufferedReader reader = null; + FileChannel channel = null; + FileLock lock = null; + BufferedReader bufferedReader = null; try { - reader = new BufferedReader(new FileReader(file)); - String line = null; - while ((line = reader.readLine()) != null) { - try { - Pair hostPorts = SystemInfoService.validateHostAndPort(line); + FileInputStream stream = new FileInputStream(clusterInfoFile); + channel = stream.getChannel(); + lock = channel.lock(0, Long.MAX_VALUE, true); + + bufferedReader = new BufferedReader(new InputStreamReader(stream)); + String str = null; + while ((str = bufferedReader.readLine()) != null) { + if (!str.startsWith(groupName)) { + continue; + } + LOG.debug("read line: {}", str); + String[] parts = str.split("="); + if (parts.length != 2 || Strings.isNullOrEmpty(parts[1])) { + return result; + } + String endpointList = parts[1]; + String[] endpoints = endpointList.split(","); + + for (String endpoint : endpoints) { + Pair hostPorts = SystemInfoService.validateHostAndPort(endpoint); result.add(hostPorts); - } catch (Exception e) { - LOG.warn("failed to read host port from {}", groupName, e); - return null; } + + // only need one line + break; } + } catch (FileNotFoundException e) { + LOG.warn("file not found", e); + return null; } catch (IOException e) { - LOG.warn("failed to read from file.", e); + LOG.warn("failed to read file", e); + return null; + } catch (AnalysisException e) { + LOG.warn("failed to parse endpoint", e); return null; } finally { - if (reader != null) { + if (bufferedReader != null) { + try { + bufferedReader.close(); + } catch (IOException e) { + LOG.warn("failed to close buffered reader after reading file: {}", clusterInfoFile, e); + } + } + if (lock != null && channel.isOpen()) { + try { + lock.release(); + } catch (IOException e) { + LOG.warn("failed to release lock after reading file: {}", clusterInfoFile, e); + } + } + if (channel != null && channel.isOpen()) { try { - reader.close(); - } catch (IOException e2) { - e2.printStackTrace(); - return null; + channel.close(); + } catch (IOException e) { + LOG.warn("failed to close channel after reading file: {}", clusterInfoFile, e); } } } @@ -80,4 +136,22 @@ public List> getGroupHostPorts(String groupName) { LOG.info("get hosts from {}: {}", groupName, result); return result; } + + @Override + protected Map>> getBrokerGroupHostPorts() { + List> hostPorts = getGroupHostPorts(brokerServiceGroup); + if (hostPorts == null) { + return null; + } + final String brokerName = System.getenv(ENV_BROKER_NAME); + if (Strings.isNullOrEmpty(brokerName)) { + LOG.error("failed to get broker name from env: {}", ENV_BROKER_NAME); + System.exit(-1); + } + + Map>> brokers = Maps.newHashMap(); + brokers.put(brokerName, hostPorts); + LOG.info("get brokers from file: {}", brokers); + return brokers; + } } diff --git a/fe/src/com/baidu/palo/metric/MetricRepo.java b/fe/src/com/baidu/palo/metric/MetricRepo.java index 7fee0eabaf0f3e..de8896fc7e4f98 100644 --- a/fe/src/com/baidu/palo/metric/MetricRepo.java +++ b/fe/src/com/baidu/palo/metric/MetricRepo.java @@ -98,7 +98,7 @@ public Integer getValue() { if (!Catalog.getInstance().isMaster()) { return 0; } - if (jobType != JobType.SCHEMA_CHANGE) { + if (jobType == JobType.SCHEMA_CHANGE) { return alter.getSchemaChangeHandler().getAlterJobNumByState(com.baidu.palo.alter.AlterJob.JobState.RUNNING); } else { return alter.getRollupHandler().getAlterJobNumByState(com.baidu.palo.alter.AlterJob.JobState.RUNNING); diff --git a/fe/src/com/baidu/palo/planner/SingleNodePlanner.java b/fe/src/com/baidu/palo/planner/SingleNodePlanner.java index 7acbf7b0ef7c99..525bd4ce2d1747 100644 --- a/fe/src/com/baidu/palo/planner/SingleNodePlanner.java +++ b/fe/src/com/baidu/palo/planner/SingleNodePlanner.java @@ -1112,7 +1112,7 @@ public boolean apply(Expr expr) { // Unset the On-clause flag of the migrated conjuncts because the migrated conjuncts // apply to the post-join/agg/analytic result of the inline view. for (Expr e: viewPredicates) e.setIsOnClauseConjunct(false); - inlineViewRef.getAnalyzer().registerConjuncts(viewPredicates); + inlineViewRef.getAnalyzer().registerConjuncts(viewPredicates, inlineViewRef.getAllTupleIds()); // mark (fully resolve) slots referenced by remaining unassigned conjuncts as // materialized @@ -1424,7 +1424,15 @@ private PlanNode createUnionPlan(UnionStmt unionStmt, Analyzer analyzer, long de for (UnionStmt.UnionOperand op: unionStmt.getOperands()) { List opConjuncts = Expr.substituteList(conjuncts, op.getSmap(), analyzer, false); - op.getAnalyzer().registerConjuncts(opConjuncts); + if (op.getQueryStmt() instanceof SelectStmt) { + final SelectStmt select = (SelectStmt) op.getQueryStmt(); + op.getAnalyzer().registerConjuncts(opConjuncts, select.getTableRefIds()); + } else if (op.getQueryStmt() instanceof UnionStmt) { + final UnionStmt union = (UnionStmt) op.getQueryStmt(); + op.getAnalyzer().registerConjuncts(opConjuncts, union.getTupleId().asList()); + } else { + Preconditions.checkArgument(false); + } } analyzer.markConjunctsAssigned(conjuncts); } else { diff --git a/fs_brokers/apache_hdfs_broker/src/com/baidu/palo/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/com/baidu/palo/broker/hdfs/FileSystemManager.java index f921540e65d9c6..55ee4310c4d74d 100644 --- a/fs_brokers/apache_hdfs_broker/src/com/baidu/palo/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/com/baidu/palo/broker/hdfs/FileSystemManager.java @@ -231,7 +231,6 @@ public TBrokerFD openReader(String clientId, String path, long startOffset, Map< public ByteBuffer pread(TBrokerFD fd, long offset, long length) { FSDataInputStream fsDataInputStream = clientContextManager.getFsDataInputStream(fd); synchronized (fsDataInputStream) { - long currentStreamOffset; try { currentStreamOffset = fsDataInputStream.getPos();