diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 8b484ae2973f51..7523750e50fd36 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -52,6 +52,17 @@ using std::vector; namespace palo { +const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; +const uint32_t TASK_FINISH_MAX_RETRY = 3; +const uint32_t PUSH_MAX_RETRY = 1; +const uint32_t REPORT_TASK_WORKER_COUNT = 1; +const uint32_t REPORT_DISK_STATE_WORKER_COUNT = 1; +const uint32_t REPORT_OLAP_TABLE_WORKER_COUNT = 1; +const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15; +const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?"; +const std::string HTTP_REQUEST_TOKEN_PARAM = "&token="; +const std::string HTTP_REQUEST_FILE_PARAM = "&file="; + std::atomic_ulong TaskWorkerPool::_s_report_version(time(NULL) * 10000); MutexLock TaskWorkerPool::_s_task_signatures_lock; MutexLock TaskWorkerPool::_s_running_task_user_count_lock; @@ -296,7 +307,7 @@ uint32_t TaskWorkerPool::_get_next_task_index( if (task.__isset.resource_info) { user = task.resource_info.user; } - + if (priority == TPriority::HIGH) { if (task.__isset.priority && task.priority == TPriority::HIGH) { index = i; @@ -305,7 +316,7 @@ uint32_t TaskWorkerPool::_get_next_task_index( continue; } } - + if (improper_users.count(user) != 0) { continue; } @@ -337,12 +348,12 @@ uint32_t TaskWorkerPool::_get_next_task_index( improper_users.insert(user); } } - + if (index == -1) { if (priority == TPriority::HIGH) { return index; } - + index = 0; if (tasks[0].__isset.resource_info) { user = tasks[0].resource_info.user; @@ -367,7 +378,7 @@ void* TaskWorkerPool::_create_table_worker_thread_callback(void* arg_this) { TAgentTaskRequest agent_task_req; TCreateTabletReq create_tablet_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); + lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); while (worker_pool_this->_tasks.empty()) { worker_pool_this->_worker_thread_condition_lock.wait(); } @@ -418,7 +429,7 @@ void* TaskWorkerPool::_drop_table_worker_thread_callback(void* arg_this) { TAgentTaskRequest agent_task_req; TDropTabletReq drop_tablet_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); + lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); while (worker_pool_this->_tasks.empty()) { worker_pool_this->_worker_thread_condition_lock.wait(); } @@ -639,7 +650,7 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { // Try to register to cgroups_mgr CgroupsMgr::apply_system_cgroup(); TaskWorkerPool* worker_pool_this = (TaskWorkerPool*)arg_this; - + // gen high priority worker thread TPriority::type priority = TPriority::NORMAL; int32_t push_worker_count_high_priority = config::push_worker_count_high_priority; @@ -676,7 +687,7 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { worker_pool_this->_worker_thread_condition_lock.notify(); break; } - + agent_task_req = worker_pool_this->_tasks[index]; if (agent_task_req.__isset.resource_info) { user = agent_task_req.resource_info.user; @@ -684,7 +695,7 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { push_req = agent_task_req.push_req; worker_pool_this->_tasks.erase(worker_pool_this->_tasks.begin() + index); } while (0); - + #ifndef BE_TEST if (index < 0) { // there is no high priority task in queue @@ -692,7 +703,7 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { continue; } #endif - + OLAP_LOG_INFO("get push task. signature: %ld, user: %s, priority: %d", agent_task_req.signature, user.c_str(), priority); @@ -734,11 +745,11 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { } else { status = PALO_TASK_REQUEST_ERROR; } - + // Return result to fe vector error_msgs; TStatus task_status; - + TFinishTaskRequest finish_task_request; finish_task_request.__set_backend(worker_pool_this->_backend); finish_task_request.__set_task_type(agent_task_req.task_type); @@ -829,7 +840,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { agent_task_req.signature); error_msgs.push_back("clone get local root path failed."); status = PALO_ERROR; - } + } } string src_file_path; @@ -861,8 +872,8 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { error_msgs.push_back("load header failed."); status = PALO_ERROR; } - } - + } + #ifndef BE_TEST // Clean useless dir, if failed, ignore it. if (status != PALO_SUCCESS && status != PALO_CREATE_TABLE_EXIST) { @@ -910,7 +921,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { && (tablet_info.version < clone_req.committed_version || (tablet_info.version == clone_req.committed_version && tablet_info.version_hash != clone_req.committed_version_hash))) { - + // we need to check if this cloned table's version is what we expect. // if not, maybe this is a stale remaining table which is waiting for drop. // we drop it. @@ -922,7 +933,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { agent_task_req.signature, tablet_info.version, tablet_info.version_hash, clone_req.committed_version, clone_req.committed_version_hash); - + TDropTabletReq drop_req; drop_req.tablet_id = clone_req.tablet_id; drop_req.schema_hash = clone_req.schema_hash; @@ -932,7 +943,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { OLAP_LOG_WARNING( "drop stale cloned table failed! tabelt id: %ld", clone_req.tablet_id); } - + status = PALO_ERROR; } else { OLAP_LOG_INFO("clone get tablet info success. " @@ -971,7 +982,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, ""); #ifndef BE_TEST } -#endif +#endif return (void*)0; } @@ -985,6 +996,15 @@ AgentStatus TaskWorkerPool::_clone_copy( vector* error_msgs) { AgentStatus status = PALO_SUCCESS; + + std::string token; + { + uint32_t cluster_id = OLAPRootPath::get_instance()->effective_cluster_id(); + stringstream token_stream; + token_stream << cluster_id; + token = token_stream.str(); + } + for (auto src_backend : clone_req.src_backends) { stringstream http_host_stream; http_host_stream << "http://" << src_backend.host << ":" << src_backend.http_port; @@ -1067,7 +1087,9 @@ AgentStatus TaskWorkerPool::_clone_copy( // Get remove dir file list FileDownloader::FileDownloaderParam downloader_param; - downloader_param.remote_file_path = http_host + HTTP_REQUEST_PREFIX + src_file_full_path; + downloader_param.remote_file_path = http_host + HTTP_REQUEST_PREFIX + + HTTP_REQUEST_TOKEN_PARAM + token + + HTTP_REQUEST_FILE_PARAM + src_file_full_path; downloader_param.curl_opt_timeout = LIST_REMOTE_FILE_TIMEOUT; #ifndef BE_TEST @@ -1149,8 +1171,9 @@ AgentStatus TaskWorkerPool::_clone_copy( // Get copy from remote for (auto file_name : file_name_list) { download_retry_time = 0; - downloader_param.remote_file_path = - http_host + HTTP_REQUEST_PREFIX + src_file_full_path + file_name; + downloader_param.remote_file_path = http_host + HTTP_REQUEST_PREFIX + + HTTP_REQUEST_TOKEN_PARAM + token + + HTTP_REQUEST_FILE_PARAM + src_file_full_path + file_name; downloader_param.local_file_path = local_file_full_path + file_name; // Get file length @@ -1200,7 +1223,7 @@ AgentStatus TaskWorkerPool::_clone_copy( status = PALO_ERROR; break; } - + estimate_time_out = file_size / config::download_low_speed_limit_kbps / 1024; if (estimate_time_out < config::download_low_speed_time) { estimate_time_out = config::download_low_speed_time; @@ -1240,7 +1263,7 @@ AgentStatus TaskWorkerPool::_clone_copy( src_host->host.c_str(), downloader_param.remote_file_path.c_str(), signature, file_size, local_file_size); - download_status = PALO_FILE_DOWNLOAD_FAILED; + download_status = PALO_FILE_DOWNLOAD_FAILED; } else { chmod(downloader_param.local_file_path.c_str(), S_IRUSR | S_IWUSR); break; @@ -1287,7 +1310,7 @@ AgentStatus TaskWorkerPool::_clone_copy( if (status == PALO_SUCCESS) { break; } - } // clone copy from one backend + } // clone copy from one backend return status; } @@ -1302,7 +1325,7 @@ void* TaskWorkerPool::_storage_medium_migrate_worker_thread_callback(void* arg_t TAgentTaskRequest agent_task_req; TStorageMediumMigrateReq storage_medium_migrate_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); + lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); while (worker_pool_this->_tasks.empty()) { worker_pool_this->_worker_thread_condition_lock.wait(); } @@ -1410,7 +1433,7 @@ void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this) TAgentTaskRequest agent_task_req; TCheckConsistencyReq check_consistency_req; { - lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); + lock_guard worker_thread_lock(worker_pool_this->_worker_thread_lock); while (worker_pool_this->_tasks.empty()) { worker_pool_this->_worker_thread_condition_lock.wait(); } @@ -1479,7 +1502,7 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) { worker_pool_this->_master_info.network_address.port); TMasterResult result; AgentStatus status = worker_pool_this->_master_client->report(request, &result); - + if (status == PALO_SUCCESS) { OLAP_LOG_INFO("finish report task success. return code: %d", result.status.status_code); @@ -1677,7 +1700,7 @@ void* TaskWorkerPool::_upload_worker_thread_callback(void* arg_this) { worker_pool_this->_finish_task(finish_task_request); worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, ""); #ifndef BE_TEST - } + } #endif return (void*)0; } @@ -1725,7 +1748,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) { OLAP_LOG_WARNING("Write remote source info to file failed. Path: %s", info_file_path.c_str()); } - + // Get local disk to restore from olap string local_shard_root_path; if (status_code == TStatusCode::OK) { @@ -1742,7 +1765,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) { stringstream local_file_path_stream; local_file_path_stream << local_shard_root_path << "/" << restore_request.tablet_id << "/"; string local_file_path(local_file_path_stream.str()); - + // Download files from remote source if (status_code == TStatusCode::OK) { string command = "sh " + config::trans_file_tool_path + " " + label + " download " + @@ -1757,7 +1780,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) { OLAP_LOG_WARNING("Download file failed. Error: %s", errmsg.c_str()); } } - + // Delete tmp file boost::filesystem::path file_path(info_file_path); if (boost::filesystem::exists(file_path)) { @@ -1790,7 +1813,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) { file_path_suffix != ".dat") { continue; } - + // Get new file name stringstream new_file_name_stream; char sperator = '_'; @@ -1837,7 +1860,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) { restore_request.schema_hash, agent_task_req.signature, &tablet_info); - + if (get_tablet_info_status != PALO_SUCCESS) { OLAP_LOG_WARNING("Restore success, but get new tablet info failed." "tablet_id: %ld, schema_hash: %ld, signature: %ld.", @@ -1861,7 +1884,7 @@ void* TaskWorkerPool::_restore_worker_thread_callback(void* arg_this) { worker_pool_this->_finish_task(finish_task_request); worker_pool_this->_remove_task_info(agent_task_req.task_type, agent_task_req.signature, ""); #ifndef BE_TEST - } + } #endif return (void*)0; } @@ -1969,7 +1992,7 @@ void* TaskWorkerPool::_release_snapshot_thread_callback(void* arg_this) { OLAP_LOG_INFO("release_snapshot success. snapshot_path: %s, status: %d", snapshot_path.c_str(), release_snapshot_status); } - + task_status.__set_status_code(status_code); task_status.__set_error_msgs(error_msgs); diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 02fe0bdf5c7c87..f013672b28daee 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -31,15 +31,6 @@ namespace palo { -const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3; -const uint32_t TASK_FINISH_MAX_RETRY = 3; -const uint32_t PUSH_MAX_RETRY = 1; -const uint32_t REPORT_TASK_WORKER_COUNT = 1; -const uint32_t REPORT_DISK_STATE_WORKER_COUNT = 1; -const uint32_t REPORT_OLAP_TABLE_WORKER_COUNT = 1; -const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15; -const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?file="; - class TaskWorkerPool { public: enum TaskWorkerType { diff --git a/be/src/common/config.h b/be/src/common/config.h index fc92dabf07fa0c..1ccc8d69fce79f 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -312,6 +312,9 @@ namespace config { // "If true, Kudu features will be disabled." CONF_Bool(disable_kudu, "false") + // to forward compatibility, will be removed later + CONF_Bool(enable_token_check, "true"); + } // namespace config } // namespace palo diff --git a/be/src/http/download_action.cpp b/be/src/http/download_action.cpp index 59bdd301563545..39eb29dacd148f 100644 --- a/be/src/http/download_action.cpp +++ b/be/src/http/download_action.cpp @@ -28,23 +28,40 @@ #include "http/http_status.h" #include "util/defer_op.h" #include "util/file_utils.h" +#include "util/filesystem_util.h" +#include "util/string_parser.hpp" +#include "runtime/exec_env.h" namespace palo { const std::string FILE_PARAMETER = "file"; const std::string DB_PARAMETER = "db"; const std::string LABEL_PARAMETER = "label"; +const std::string TOKEN_PARAMETER = "token"; -DownloadAction::DownloadAction(ExecEnv* exec_env, const std::string& base_dir) : - _exec_env(exec_env), - _base_dir(base_dir) { +DownloadAction::DownloadAction(ExecEnv* exec_env, const std::vector& allow_dirs) : + _exec_env(exec_env), + _allow_paths(allow_dirs) { } void DownloadAction::handle(HttpRequest *req, HttpChannel *channel) { - LOG(INFO) << "accept one request " << req->debug_string(); + LOG(INFO) << "accept one download request " << req->debug_string(); // add tid to cgroup in order to limit read bandwidth CgroupsMgr::apply_system_cgroup(); + + // check token + Status status; + if (config::enable_token_check) { + status = check_token(req); + if (!status.ok()) { + std::string error_msg = status.get_error_msg(); + HttpResponse response(HttpStatus::OK, &error_msg); + channel->send_response(response); + return; + } + } + // Get 'file' parameter, then assembly file absolute path const std::string& file_path = req->param(FILE_PARAMETER); if (file_path.empty()) { @@ -54,28 +71,21 @@ void DownloadAction::handle(HttpRequest *req, HttpChannel *channel) { channel->send_response(response); return; } - std::string file_absolute_path = _base_dir; - if (!file_absolute_path.empty()) { - const std::string& db_name = req->param(DB_PARAMETER); - if (!db_name.empty()) { - file_absolute_path += "/" + db_name; - } - const std::string& label_name = req->param(LABEL_PARAMETER); - if (!label_name.empty()) { - file_absolute_path += "/" + label_name; - } - file_absolute_path += "/" + file_path; - } else { - // in tablet download, a absolute path is given. - file_absolute_path = file_path; + + status = check_path(file_path); + if (!status.ok()) { + std::string error_msg = status.get_error_msg(); + HttpResponse response(HttpStatus::OK, &error_msg); + channel->send_response(response); + return; } - VLOG_ROW << "absolute download path: " << file_absolute_path; + VLOG_ROW << "absolute download path: " << file_path; - if (FileUtils::is_dir(file_absolute_path)) { - do_dir_response(file_absolute_path, req, channel); + if (FileUtils::is_dir(file_path)) { + do_dir_response(file_path, req, channel); return; } else { - do_file_response(file_absolute_path, req, channel); + do_file_response(file_path, req, channel); } LOG(INFO) << "deal with requesst finished! "; @@ -102,8 +112,7 @@ void DownloadAction::do_dir_response( HttpResponse response(HttpStatus::OK, &result_str); channel->send_response(response); return; -} - +} void DownloadAction::do_file_response( const std::string& file_path, HttpRequest *req, HttpChannel *channel) { @@ -215,5 +224,36 @@ std::string DownloadAction::get_content_type(const std::string& file_name) { return std::string(); } +Status DownloadAction::check_token(HttpRequest *req) { + const std::string& token_str = req->param(TOKEN_PARAMETER); + if (token_str.empty()) { + return Status("token is not specified."); + } + + StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; + int32_t token = StringParser::string_to_int( + token_str.c_str(), token_str.size(), &parse_result); + if (parse_result != StringParser::PARSE_SUCCESS) { + return Status("token format is wrong."); + } + + int32_t local_token = static_cast(_exec_env->cluster_id()); + if (token != local_token) { + return Status("invalid token."); + } + + return Status::OK; +} + +Status DownloadAction::check_path(const std::string& file_path) { + for (auto& allow_path : _allow_paths) { + if (FileSystemUtil::contain_path(allow_path, file_path)) { + return Status::OK; + } + } + + return Status("file path Not Allowed."); +} + } // end namespace palo diff --git a/be/src/http/download_action.h b/be/src/http/download_action.h index 2c62296d9781b1..3d843d768b2f7f 100644 --- a/be/src/http/download_action.h +++ b/be/src/http/download_action.h @@ -13,10 +13,8 @@ // specific language governing permissions and limitations // under the License. -#ifndef BDG_PALO_BE_SRC_COMMON_UTIL_DOWNLOAD_ACTION_H -#define BDG_PALO_BE_SRC_COMMON_UTIL_DOWNLOAD_ACTION_H - -#include +#ifndef BDG_PALO_BE_SRC_HTTP_DOWNLOAD_ACTION_H +#define BDG_PALO_BE_SRC_HTTP_DOWNLOAD_ACTION_H #include "exec/csv_scanner.h" #include "exec/scan_node.h" @@ -30,16 +28,19 @@ class ExecEnv; // // TODO(lingbin): implements two useful header ('If-Modified-Since' and 'RANGE') to reduce // transmission consumption. -// We use parameter named 'file' to specify the static resource path, it relative to the -// root path of http server. +// We use parameter named 'file' to specify the static resource path, it is an absolute path. class DownloadAction : public HttpHandler { public: - DownloadAction(ExecEnv* exec_env, const std::string& base_dir); + DownloadAction(ExecEnv* exec_env, const std::vector& allow_dirs); virtual ~DownloadAction() {} virtual void handle(HttpRequest *req, HttpChannel *channel); + private: + Status check_token(HttpRequest *req); + Status check_path(const std::string& path); + void do_file_response(const std::string& dir_path, HttpRequest *req, HttpChannel *channel); void do_dir_response(const std::string& dir_path, HttpRequest *req, HttpChannel *channel); @@ -54,10 +55,10 @@ class DownloadAction : public HttpHandler { std::string get_content_type(const std::string& file_name); ExecEnv* _exec_env; - std::string _base_dir; + std::vector _allow_paths; }; // end class DownloadAction } // end namespace palo -#endif // BDG_PALO_BE_SRC_COMMON_UTIL_DOWNLOAD_ACTION_H +#endif // BDG_PALO_BE_SRC_HTTP_DOWNLOAD_ACTION_H diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp index 574f15df13b851..e7fa33c769907d 100644 --- a/be/src/runtime/etl_job_mgr.cpp +++ b/be/src/runtime/etl_job_mgr.cpp @@ -39,7 +39,9 @@ namespace palo { std::string EtlJobMgr::to_http_path(const std::string& file_name) { std::stringstream url; url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port - << "/api/_download_load?file=" << file_name; + << "/api/_download_load?" + << "token=" << _exec_env->cluster_id() + << "&file=" << file_name; return url.str(); } diff --git a/be/src/runtime/etl_job_mgr.h b/be/src/runtime/etl_job_mgr.h index c0bfaf95310fc6..7bdde9ffa08e99 100644 --- a/be/src/runtime/etl_job_mgr.h +++ b/be/src/runtime/etl_job_mgr.h @@ -87,6 +87,7 @@ class EtlJobMgr : public RestMonitorIface { void finalize_job(PlanFragmentExecutor* executor); virtual void debug(std::stringstream& ss); + private: std::string to_http_path(const std::string& file_path); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 2fcda7f501ebd1..4a73516e06e40e 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -51,6 +51,7 @@ #include "http/download_action.h" #include "http/monitor_action.h" #include "http/http_method.h" +#include "olap/olap_rootpath.h" #include "util/network_util.h" #include "util/bfd_parser.h" #include "runtime/etl_job_mgr.h" @@ -157,51 +158,7 @@ Status ExecEnv::start_services() { // Start services in order to ensure that dependencies between them are met if (_enable_webserver) { - add_default_path_handlers(_web_page_handler.get(), _mem_tracker.get()); - _webserver->register_handler(HttpMethod::PUT, - "/api/{db}/{table}/_load", - new MiniLoadAction(this)); - DownloadAction* download_action = new DownloadAction(this, ""); - // = new DownloadAction(this, config::mini_load_download_path); - _webserver->register_handler(HttpMethod::GET, "/api/_download_load", download_action); - _webserver->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); - - DownloadAction* tablet_download_action = new DownloadAction(this, ""); - _webserver->register_handler(HttpMethod::HEAD, - "/api/_tablet/_download", - tablet_download_action); - _webserver->register_handler(HttpMethod::GET, - "/api/_tablet/_download", - tablet_download_action); - - // Register monitor - MonitorAction* monitor_action = new MonitorAction(); - monitor_action->register_module("etl_mgr", etl_job_mgr()); - monitor_action->register_module("fragment_mgr", fragment_mgr()); - _webserver->register_handler(HttpMethod::GET, "/_monitor/{module}", monitor_action); - - // Register BE health action - HealthAction* health_action = new HealthAction(this); - _webserver->register_handler(HttpMethod::GET, "/api/health", health_action); - - // register pprof actions - PprofActions::setup(this, _webserver.get()); - -#ifndef BE_TEST - // Register BE checksum action - ChecksumAction* checksum_action = new ChecksumAction(this); - _webserver->register_handler(HttpMethod::GET, "/api/checksum", checksum_action); - - // Register BE reload tablet action - ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(this); - _webserver->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action); - - // Register BE snapshot action - SnapshotAction* snapshot_action = new SnapshotAction(this); - _webserver->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); -#endif - - RETURN_IF_ERROR(_webserver->start()); + RETURN_IF_ERROR(start_webserver()); } else { LOG(INFO) << "Webserver is disabled"; } @@ -212,4 +169,60 @@ Status ExecEnv::start_services() { return Status::OK; } +uint32_t ExecEnv::cluster_id() { + return OLAPRootPath::get_instance()->effective_cluster_id(); +} + +Status ExecEnv::start_webserver() { + add_default_path_handlers(_web_page_handler.get(), _mem_tracker.get()); + _webserver->register_handler(HttpMethod::PUT, + "/api/{db}/{table}/_load", + new MiniLoadAction(this)); + + std::vector allow_paths; + OLAPRootPath::get_instance()->get_all_available_root_path(&allow_paths); + DownloadAction* download_action = new DownloadAction(this, allow_paths); + // = new DownloadAction(this, config::mini_load_download_path); + _webserver->register_handler(HttpMethod::GET, "/api/_download_load", download_action); + _webserver->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); + + DownloadAction* tablet_download_action = new DownloadAction(this, allow_paths); + _webserver->register_handler(HttpMethod::HEAD, + "/api/_tablet/_download", + tablet_download_action); + _webserver->register_handler(HttpMethod::GET, + "/api/_tablet/_download", + tablet_download_action); + + // Register monitor + MonitorAction* monitor_action = new MonitorAction(); + monitor_action->register_module("etl_mgr", etl_job_mgr()); + monitor_action->register_module("fragment_mgr", fragment_mgr()); + _webserver->register_handler(HttpMethod::GET, "/_monitor/{module}", monitor_action); + + // Register BE health action + HealthAction* health_action = new HealthAction(this); + _webserver->register_handler(HttpMethod::GET, "/api/health", health_action); + + // register pprof actions + PprofActions::setup(this, _webserver.get()); + +#ifndef BE_TEST + // Register BE checksum action + ChecksumAction* checksum_action = new ChecksumAction(this); + _webserver->register_handler(HttpMethod::GET, "/api/checksum", checksum_action); + + // Register BE reload tablet action + ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(this); + _webserver->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action); + + // Register BE snapshot action + SnapshotAction* snapshot_action = new SnapshotAction(this); + _webserver->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); +#endif + + RETURN_IF_ERROR(_webserver->start()); + return Status::OK; +} + } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 69193b758f9bc0..82b1fcf782fb6a 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -74,6 +74,8 @@ class ExecEnv { // declarations for classes in scoped_ptrs. virtual ~ExecEnv(); + uint32_t cluster_id(); + DataStreamMgr* stream_mgr() { return _stream_mgr.get(); } @@ -101,8 +103,8 @@ class ExecEnv { MemTracker* process_mem_tracker() { return _mem_tracker.get(); } - PoolMemTrackerRegistry* pool_mem_trackers() { - return _pool_mem_trackers.get(); + PoolMemTrackerRegistry* pool_mem_trackers() { + return _pool_mem_trackers.get(); } ThreadResourceMgr* thread_mgr() { return _thread_mgr.get(); @@ -162,6 +164,7 @@ class ExecEnv { Status init_for_tests(); private: + Status start_webserver(); // Leave protected so that subclasses can override boost::scoped_ptr _stream_mgr; boost::scoped_ptr _result_mgr; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 6efee680119e2f..97988ba18af3a7 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -210,7 +210,9 @@ void FragmentExecState::callback(const Status& status, RuntimeProfile* profile, std::string FragmentExecState::to_http_path(const std::string& file_name) { std::stringstream url; url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port - << "/api/_download_load?file=" << file_name; + << "/api/_download_load?" + << "token=" << _exec_env->cluster_id() + << "&file=" << file_name; return url.str(); } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index ceba0b4cc3f021..adc614c9ba859e 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -382,10 +382,6 @@ class RuntimeState { return _load_dir; } - const void set_load_dir(std::string& dir) { - _load_dir = dir; - } - void set_load_job_id(int64_t job_id) { _load_job_id = job_id; } @@ -426,10 +422,6 @@ class RuntimeState { return _error_log_file_path; } - const void set_error_log_file_path(const std::string& file_path) { - _error_log_file_path = file_path; - } - // TODO(lingbin): remove this file error method after mysql error exporter is stable. void append_error_msg_to_file(const std::string& line, const std::string& error_msg); diff --git a/be/src/util/filesystem_util.cc b/be/src/util/filesystem_util.cc index 9f5c7c74fcc348..e9911e4132e456 100644 --- a/be/src/util/filesystem_util.cc +++ b/be/src/util/filesystem_util.cc @@ -170,5 +170,36 @@ uint64_t FileSystemUtil::max_num_file_handles() { return 0ul; } +// NOTE: the parent_path and sub_path can either dir or file. +// return true if patent_path == sub_path +bool FileSystemUtil::contain_path( + const std::string& parent_path, const std::string& sub_path) { + boost::filesystem::path parent(parent_path); + boost::filesystem::path sub(sub_path); + parent = parent.lexically_normal(); + sub = sub.lexically_normal(); + + if (parent == sub) { + return true; + } + + if (parent.filename() == ".") { + parent.remove_filename(); + } + + // We're also not interested in the file's name. + if (sub.has_filename()) { + sub.remove_filename(); + } + // If dir has more components than file, then file can't possibly reside in dir. + auto dir_len = std::distance(parent.begin(), parent.end()); + auto file_len = std::distance(sub.begin(), sub.end()); + if (dir_len > file_len) { + return false; + } + + return std::equal(parent.begin(), parent.end(), sub.begin()); +} + } // end namespace palo diff --git a/be/src/util/filesystem_util.h b/be/src/util/filesystem_util.h index 6c071aa503fd37..b101b65a5f772c 100644 --- a/be/src/util/filesystem_util.h +++ b/be/src/util/filesystem_util.h @@ -57,6 +57,8 @@ class FileSystemUtil { // Returns the currently allowed maximum of possible file descriptors. In case of an // error returns 0. static uint64_t max_num_file_handles(); + + static bool contain_path(const std::string& parent_path, const std::string& sub_path); }; } diff --git a/be/test/util/filesystem_util_test.cpp b/be/test/util/filesystem_util_test.cpp index f5b47df25882a2..bf6817e32fb4ed 100644 --- a/be/test/util/filesystem_util_test.cpp +++ b/be/test/util/filesystem_util_test.cpp @@ -27,11 +27,11 @@ namespace palo { namespace filesystem = boost::filesystem; using filesystem::path; -TEST(FilesystemUtil, rlimit) { +TEST(FileSystemUtil, rlimit) { ASSERT_LT(0ul, FileSystemUtil::max_num_file_handles()); } -TEST(FilesystemUtil, CreateDirectory) { +TEST(FileSystemUtil, CreateDirectory) { // Setup a temporary directory with one subdir path dir = filesystem::unique_path(); path subdir1 = dir / "path1"; @@ -61,14 +61,71 @@ TEST(FilesystemUtil, CreateDirectory) { filesystem::remove_all(dir); } +TEST(FilesystemUtil, contain_path) { + { + std::string parent("/a/b"); + std::string sub("/a/b/c"); + EXPECT_TRUE(FileSystemUtil::contain_path(parent, sub)); + EXPECT_FALSE(FileSystemUtil::contain_path(sub, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(parent, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(sub, sub)); + } + + { + std::string parent("/a/b/"); + std::string sub("/a/b/c/"); + EXPECT_TRUE(FileSystemUtil::contain_path(parent, sub)); + EXPECT_FALSE(FileSystemUtil::contain_path(sub, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(parent, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(sub, sub)); + } + + { + std::string parent("/a///./././/./././b/"); // "/a/b/." + std::string sub("/a/b/../././b/c/"); // "/a/b/c/" + EXPECT_TRUE(FileSystemUtil::contain_path(parent, sub)); + EXPECT_FALSE(FileSystemUtil::contain_path(sub, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(parent, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(sub, sub)); + } + + { + // relative path + std::string parent("a/b/"); // "a/b/" + std::string sub("a/b/c/"); // "a/b/c/" + EXPECT_TRUE(FileSystemUtil::contain_path(parent, sub)); + EXPECT_FALSE(FileSystemUtil::contain_path(sub, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(parent, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(sub, sub)); + } + { + // relative path + std::string parent("a////./././b/"); // "a/b/" + std::string sub("a/b/../././b/c/"); // "a/b/c/" + EXPECT_TRUE(FileSystemUtil::contain_path(parent, sub)); + EXPECT_FALSE(FileSystemUtil::contain_path(sub, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(parent, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(sub, sub)); + } + { + // absolute path and relative path + std::string parent("/a////./././b/"); // "/a/b/" + std::string sub("a/b/../././b/c/"); // "a/b/c/" + EXPECT_FALSE(FileSystemUtil::contain_path(parent, sub)); + EXPECT_FALSE(FileSystemUtil::contain_path(sub, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(parent, parent)); + EXPECT_TRUE(FileSystemUtil::contain_path(sub, sub)); + } +} + } // end namespace palo int main(int argc, char** argv) { - // std::string conffile = std::string(getenv("PALO_HOME")) + "/conf/be.conf"; - // if (!palo::config::init(conffile.c_str(), false)) { - // fprintf(stderr, "error read config file. \n"); - // return -1; - // } + std::string conffile = std::string(getenv("PALO_HOME")) + "/conf/be.conf"; + if (!palo::config::init(conffile.c_str(), false)) { + fprintf(stderr, "error read config file. \n"); + return -1; + } palo::init_glog("be-test"); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();