diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index ec40b7909e5139..ae0a0504cf08d0 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -60,7 +60,7 @@ void HeartbeatServer::heartbeat( << "counter:" << google::COUNTER; // do heartbeat - Status st = _heartbeat(master_info); + Status st = _heartbeat(master_info); st.to_thrift(&heartbeat_result.status); if (st.ok()) { @@ -73,6 +73,8 @@ void HeartbeatServer::heartbeat( Status HeartbeatServer::_heartbeat( const TMasterInfo& master_info) { + + std::lock_guard lk(_hb_mtx); if (master_info.__isset.backend_ip) { if (master_info.backend_ip != BackendOptions::get_localhost()) { @@ -104,12 +106,14 @@ Status HeartbeatServer::_heartbeat( } } + bool need_report = false; if (_master_info->network_address.hostname != master_info.network_address.hostname || _master_info->network_address.port != master_info.network_address.port) { if (master_info.epoch > _epoch) { _master_info->network_address.hostname = master_info.network_address.hostname; _master_info->network_address.port = master_info.network_address.port; _epoch = master_info.epoch; + need_report = true; LOG(INFO) << "master change. new master host: " << _master_info->network_address.hostname << ". port: " << _master_info->network_address.port << ". epoch: " << _epoch; } else { @@ -119,6 +123,13 @@ Status HeartbeatServer::_heartbeat( << " local epoch: " << _epoch << " received epoch: " << master_info.epoch; return Status("epoch is not greater than local. ignore heartbeat."); } + } else { + // when Master FE restarted, host and port remains the same, but epoch will be increased. + if (master_info.epoch > _epoch) { + _epoch = master_info.epoch; + need_report = true; + LOG(INFO) << "master restarted. epoch: " << _epoch; + } } if (master_info.__isset.token) { @@ -132,6 +143,11 @@ Status HeartbeatServer::_heartbeat( } } + if (need_report) { + LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately"; + _olap_engine->report_notify(true); + } + return Status::OK; } diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h index 1a36187ca4c0c3..5b3165dcb07739 100644 --- a/be/src/agent/heartbeat_server.h +++ b/be/src/agent/heartbeat_server.h @@ -18,6 +18,8 @@ #ifndef DORIS_BE_SRC_AGENT_HEARTBEAT_SERVER_H #define DORIS_BE_SRC_AGENT_HEARTBEAT_SERVER_H +#include + #include "thrift/transport/TTransportUtils.h" #include "agent/status.h" @@ -48,14 +50,18 @@ class HeartbeatServer : public HeartbeatServiceIf { // Output parameters: // * heartbeat_result: The result of heartbeat set virtual void heartbeat(THeartbeatResult& heartbeat_result, const TMasterInfo& master_info); -private: +private: Status _heartbeat( const TMasterInfo& master_info); - TMasterInfo* _master_info; OLAPEngine* _olap_engine; + + // mutex to protect master_info and _epoch + std::mutex _hb_mtx; + TMasterInfo* _master_info; int64_t _epoch; + DISALLOW_COPY_AND_ASSIGN(HeartbeatServer); }; // class HeartBeatServer diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 08626d8d15765d..c9c19341d165a1 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,7 @@ #include #include #include + #include "boost/filesystem.hpp" #include "boost/lexical_cast.hpp" #include "agent/pusher.h" @@ -80,8 +82,6 @@ map> TaskWorkerPool::_s_running_task_user map> TaskWorkerPool::_s_total_task_user_count; map TaskWorkerPool::_s_total_task_count; FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache; -std::mutex TaskWorkerPool::_disk_broken_lock; -std::chrono::seconds TaskWorkerPool::_wait_duration; TaskWorkerPool::TaskWorkerPool( const TaskWorkerType task_worker_type, @@ -167,12 +167,10 @@ void TaskWorkerPool::start() { _callback_function = _report_task_worker_thread_callback; break; case TaskWorkerType::REPORT_DISK_STATE: - _wait_duration = std::chrono::seconds(config::report_disk_state_interval_seconds); _worker_count = REPORT_DISK_STATE_WORKER_COUNT; _callback_function = _report_disk_state_worker_thread_callback; break; case TaskWorkerType::REPORT_OLAP_TABLE: - _wait_duration = std::chrono::seconds(config::report_disk_state_interval_seconds); _worker_count = REPORT_OLAP_TABLE_WORKER_COUNT; _callback_function = _report_olap_table_worker_thread_callback; break; @@ -1825,15 +1823,14 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) continue; } #endif - vector root_paths_info; - worker_pool_this->_env->olap_engine()->get_all_root_path_info(&root_paths_info); map disks; for (auto root_path_info : root_paths_info) { TDisk disk; disk.__set_root_path(root_path_info.path); + disk.__set_path_hash(root_path_info.path_hash); disk.__set_disk_total_capacity(static_cast(root_path_info.capacity)); disk.__set_data_used_capacity(static_cast(root_path_info.data_used_capacity)); disk.__set_disk_available_capacity(static_cast(root_path_info.available)); @@ -1854,16 +1851,9 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) } #ifndef BE_TEST - { - // wait disk_broken_cv awaken - // if awaken, set is_report_disk_state_already to true, it will not notify again - // if overtime, while will go to next cycle - std::unique_lock lk(_disk_broken_lock); - auto cv_status = OLAPEngine::get_instance()->disk_broken_cv.wait_for(lk, _wait_duration); - if (cv_status == std::cv_status::no_timeout) { - OLAPEngine::get_instance()->is_report_disk_state_already = true; - } - } + // wait for notifying until timeout + OLAPEngine::get_instance()->wait_for_report_notify( + config::report_disk_state_interval_seconds, false); } #endif @@ -1889,7 +1879,6 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this) continue; } #endif - request.tablets.clear(); request.__set_report_version(_s_report_version); @@ -1899,14 +1888,9 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this) OLAP_LOG_WARNING("report get all tablets info failed. status: %d", report_all_tablets_info_status); #ifndef BE_TEST - // wait disk_broken_cv awaken - // if awaken, set is_report_olap_table_already to true, it will not notify again - // if overtime, while will go to next cycle - std::unique_lock lk(_disk_broken_lock); - auto cv_status = OLAPEngine::get_instance()->disk_broken_cv.wait_for(lk, _wait_duration); - if (cv_status == std::cv_status::no_timeout) { - OLAPEngine::get_instance()->is_report_olap_table_already = true; - } + // wait for notifying until timeout + OLAPEngine::get_instance()->wait_for_report_notify( + config::report_olap_table_interval_seconds, true); continue; #else return (void*)0; @@ -1924,14 +1908,9 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this) } #ifndef BE_TEST - // wait disk_broken_cv awaken - // if awaken, set is_report_olap_table_already to true, it will not notify again - // if overtime, while will go to next cycle - std::unique_lock lk(_disk_broken_lock); - auto cv_status = OLAPEngine::get_instance()->disk_broken_cv.wait_for(lk, _wait_duration); - if (cv_status == std::cv_status::no_timeout) { - OLAPEngine::get_instance()->is_report_olap_table_already = true; - } + // wait for notifying until timeout + OLAPEngine::get_instance()->wait_for_report_notify( + config::report_olap_table_interval_seconds, true); } #endif diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index fd6a6c174dc4f2..750dbcf29ac238 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -177,9 +177,6 @@ class TaskWorkerPool { static Mutex _s_running_task_user_count_lock; static FrontendServiceClientCache _master_service_client_cache; - static std::mutex _disk_broken_lock; - static std::chrono::seconds _wait_duration; - DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool); }; // class TaskWorkerPool } // namespace doris diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index aa8fae4e01ada0..58d14fb37c1ede 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -84,7 +84,7 @@ bool _sort_table_by_create_time(const OLAPTablePtr& a, const OLAPTablePtr& b) { static Status _validate_options(const EngineOptions& options) { if (options.store_paths.empty()) { - return Status("sotre paths is empty");; + return Status("store paths is empty");; } return Status::OK; } @@ -107,9 +107,7 @@ Status OLAPEngine::open(const EngineOptions& options, OLAPEngine** engine_ptr) { } OLAPEngine::OLAPEngine(const EngineOptions& options) - : is_report_disk_state_already(false), - is_report_olap_table_already(false), - _options(options), + : _options(options), _available_storage_medium_type_count(0), _effective_cluster_id(-1), _is_all_cluster_id_exist(true), @@ -117,7 +115,9 @@ OLAPEngine::OLAPEngine(const EngineOptions& options) _global_table_id(0), _index_stream_lru_cache(NULL), _tablet_stat_cache_update_time_ms(0), - _snapshot_base_id(0) { + _snapshot_base_id(0), + _is_report_disk_state_already(false), + _is_report_olap_table_already(false) { if (_s_instance == nullptr) { _s_instance = this; } @@ -561,14 +561,14 @@ void OLAPEngine::start_disk_stat_monitor() { // if drop tables // notify disk_state_worker_thread and olap_table_worker_thread until they received if (_is_drop_tables) { - disk_broken_cv.notify_all(); + report_notify(true); bool is_report_disk_state_expected = true; bool is_report_olap_table_expected = true; bool is_report_disk_state_exchanged = - is_report_disk_state_already.compare_exchange_strong(is_report_disk_state_expected, false); + _is_report_disk_state_already.compare_exchange_strong(is_report_disk_state_expected, false); bool is_report_olap_table_exchanged = - is_report_olap_table_already.compare_exchange_strong(is_report_olap_table_expected, false); + _is_report_olap_table_already.compare_exchange_strong(is_report_olap_table_expected, false); if (is_report_disk_state_exchanged && is_report_olap_table_exchanged) { _is_drop_tables = false; } @@ -1675,6 +1675,8 @@ OLAPStatus OLAPEngine::report_all_tablets_info(std::map* tab } tablet_info.__set_version_count(olap_table->file_delta_size()); + tablet_info.__set_path_hash(olap_table->store()->path_hash()); + tablet.tablet_infos.push_back(tablet_info); } diff --git a/be/src/olap/olap_engine.h b/be/src/olap/olap_engine.h index f54edc227d50e3..ee720ab63a9e5a 100644 --- a/be/src/olap/olap_engine.h +++ b/be/src/olap/olap_engine.h @@ -57,6 +57,7 @@ struct RootPathInfo { is_used(false) { } std::string path; + int64_t path_hash; int64_t capacity; // 总空间,单位字节 int64_t available; // 可用空间,单位字节 int64_t data_used_capacity; @@ -206,10 +207,6 @@ class OLAPEngine { // 清理trash和snapshot文件,返回清理后的磁盘使用量 OLAPStatus start_trash_sweep(double *usage); - std::condition_variable disk_broken_cv; - std::atomic_bool is_report_disk_state_already; - std::atomic_bool is_report_olap_table_already; - template std::vector get_stores(); Status set_cluster_id(int32_t cluster_id); @@ -357,6 +354,21 @@ class OLAPEngine { const TPushReq& request, std::vector* tablet_info_vec); + // call this if you want to trigger a disk and tablet report + void report_notify(bool is_all) { + is_all ? _report_cv.notify_all() : _report_cv.notify_one(); + } + + // call this to wait a report notification until timeout + void wait_for_report_notify(int64_t timeout_sec, bool is_tablet_report) { + std::unique_lock lk(_report_mtx); + auto cv_status = _report_cv.wait_for(lk, std::chrono::seconds(timeout_sec)); + if (cv_status == std::cv_status::no_timeout) { + is_tablet_report ? _is_report_olap_table_already = true : + _is_report_disk_state_already = true; + } + } + private: OLAPStatus check_all_root_path_cluster_id(); @@ -595,6 +607,12 @@ class OLAPEngine { std::thread _fd_cache_clean_thread; static atomic_t _s_request_number; + + // for tablet and disk report + std::mutex _report_mtx; + std::condition_variable _report_cv; + std::atomic_bool _is_report_disk_state_already; + std::atomic_bool _is_report_olap_table_already; }; } // namespace doris diff --git a/be/src/olap/store.cpp b/be/src/olap/store.cpp index 5842708d1ba141..ccc4a4f11a080e 100755 --- a/be/src/olap/store.cpp +++ b/be/src/olap/store.cpp @@ -39,7 +39,9 @@ #include "olap/file_helper.h" #include "olap/olap_define.h" #include "olap/utils.h" // for check_dir_existed +#include "service/backend_options.h" #include "util/file_utils.h" +#include "util/string_util.h" #include "olap/olap_header_manager.h" namespace doris { @@ -93,7 +95,6 @@ Status OlapStore::load() { RETURN_IF_ERROR(_init_cluster_id()); RETURN_IF_ERROR(_init_extension_and_capacity()); RETURN_IF_ERROR(_init_file_system()); - RETURN_IF_ERROR(_init_meta()); _is_used = true; @@ -275,6 +276,11 @@ Status OlapStore::_init_file_system() { } Status OlapStore::_init_meta() { + // init path hash + _path_hash = hash_of_path(BackendOptions::get_localhost(), _path); + LOG(INFO) << "get hash of path: " << _path + << ": " << _path_hash; + // init meta _meta = new(std::nothrow) OlapMeta(_path); if (_meta == nullptr) { @@ -581,17 +587,10 @@ OLAPStatus OlapStore::_check_none_row_oriented_table_in_store( } // init must be called RETURN_NOT_OK(olap_header->init()); - OLAPTablePtr olap_table = - OLAPTable::create_from_header(olap_header.release()); - if (olap_table == nullptr) { - LOG(WARNING) << "fail to new table. tablet_id=" << tablet_id << ", schema_hash:" << schema_hash; - return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR; - } - - LOG(INFO) << "data_file_type:" << olap_table->data_file_type(); - if (olap_table->data_file_type() == OLAP_DATA_FILE) { + LOG(INFO) << "data_file_type:" << olap_header->data_file_type(); + if (olap_header->data_file_type() == OLAP_DATA_FILE) { LOG(FATAL) << "Not support row-oriented table any more. Please convert it to column-oriented table." - << "tablet=" << olap_table->full_name(); + << "tablet=" << tablet_id << "." << schema_hash; } return OLAP_SUCCESS; diff --git a/be/src/olap/store.h b/be/src/olap/store.h index 392da3771a8b1f..ca8ba350c33e7a 100644 --- a/be/src/olap/store.h +++ b/be/src/olap/store.h @@ -42,12 +42,14 @@ class OlapStore { Status load(); const std::string& path() const { return _path; } + const int64_t path_hash() const { return _path_hash; } bool is_used() const { return _is_used; } void set_is_used(bool is_used) { _is_used = is_used; } int32_t cluster_id() const { return _cluster_id; } RootPathInfo to_root_path_info() { RootPathInfo info; info.path = _path; + info.path_hash = _path_hash; info.is_used = _is_used; info.capacity = _capacity_bytes; return info; @@ -105,6 +107,7 @@ class OlapStore { friend class OLAPEngine; std::string _path; + int64_t _path_hash; int32_t _cluster_id; uint32_t _rand_seed; diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 6178740c58c475..6b9ab13639d23f 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -71,6 +71,7 @@ add_library(Util STATIC core_local.cpp uid_util.cpp aes_util.cpp + string_util.cpp ) #ADD_BE_TEST(integer-array-test) diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp index 77acc9545a3d2c..2ec54a1a415734 100644 --- a/be/src/util/blocking_queue.hpp +++ b/be/src/util/blocking_queue.hpp @@ -20,8 +20,8 @@ #include #include -#include -#include +#include +#include #include "common/logging.h" #include "util/stopwatch.hpp" @@ -45,7 +45,7 @@ class BlockingQueue { // are no more elements available. bool blocking_get(T* out) { MonotonicStopWatch timer; - boost::unique_lock unique_lock(_lock); + std::unique_lock unique_lock(_lock); while (true) { if (!_list.empty()) { @@ -73,10 +73,10 @@ class BlockingQueue { /* bool blocking_put_with_timeout(const T& val, int64_t timeout_micros) { MonotonicStopWatch timer; - boost::unique_lock write_lock(_lock); - boost::system_time wtime = boost::get_system_time() + - boost::posix_time::microseconds(timeout_micros); - const struct timespec timeout = boost::detail::to_timespec(wtime); + std::unique_lock write_lock(_lock); + std::system_time wtime = std::get_system_time() + + std::posix_time::microseconds(timeout_micros); + const struct timespec timeout = std::detail::to_timespec(wtime); bool notified = true; while (SizeLocked(write_lock) >= _max_elements && !_shutdown && notified) { timer.Start(); @@ -102,7 +102,7 @@ class BlockingQueue { // If the queue is shut down, returns false. bool blocking_put(const T& val) { MonotonicStopWatch timer; - boost::unique_lock unique_lock(_lock); + std::unique_lock unique_lock(_lock); while (_list.size() >= _max_elements && !_shutdown) { timer.start(); @@ -126,7 +126,7 @@ class BlockingQueue { // Shut down the queue. Wakes up all threads waiting on BlockingGet or BlockingPut. void shutdown() { { - boost::lock_guard guard(_lock); + std::lock_guard guard(_lock); _shutdown = true; } @@ -135,25 +135,25 @@ class BlockingQueue { } uint32_t get_size() const { - boost::unique_lock l(_lock); + std::unique_lock l(_lock); return _list.size(); } // Returns the total amount of time threads have blocked in BlockingGet. uint64_t total_get_wait_time() const { - boost::lock_guard guard(_lock); + std::lock_guard guard(_lock); return _total_get_wait_time; } // Returns the total amount of time threads have blocked in BlockingPut. uint64_t total_put_wait_time() const { - boost::lock_guard guard(_lock); + std::lock_guard guard(_lock); return _total_put_wait_time; } private: - uint32_t SizeLocked(const boost::unique_lock& lock) const { + uint32_t SizeLocked(const std::unique_lock& lock) const { // The size of 'get_list_' is read racily to avoid getting 'get_lock_' in write path. DCHECK(lock.owns_lock()); return _list.size(); @@ -161,10 +161,10 @@ class BlockingQueue { bool _shutdown; const int _max_elements; - boost::condition_variable _get_cv; // 'get' callers wait on this - boost::condition_variable _put_cv; // 'put' callers wait on this + std::condition_variable _get_cv; // 'get' callers wait on this + std::condition_variable _put_cv; // 'put' callers wait on this // _lock guards access to _list, total_get_wait_time, and total_put_wait_time - mutable boost::mutex _lock; + mutable std::mutex _lock; std::list _list; uint64_t _total_get_wait_time; uint64_t _total_put_wait_time; diff --git a/be/src/util/string_util.cpp b/be/src/util/string_util.cpp new file mode 100644 index 00000000000000..738b03da8d4493 --- /dev/null +++ b/be/src/util/string_util.cpp @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/string_util.h" + +namespace doris { + +std::size_t hash_of_path(const std::string& identifier, const std::string& path) { + std::size_t hash = std::hash()(identifier); + std::vector path_parts; + boost::split(path_parts, path, boost::is_any_of("/")); + for (auto& part : path_parts) { + if (part.empty()) { + continue; + } + + boost::hash_combine(hash, part); + } + return hash; +} + +} // end of namespace diff --git a/be/src/util/string_util.h b/be/src/util/string_util.h index 40f788e526c41c..ccd12a3d8765cb 100644 --- a/be/src/util/string_util.h +++ b/be/src/util/string_util.h @@ -22,8 +22,11 @@ #include #include #include +#include +#include #include // to_lower_copy +#include namespace doris { @@ -57,6 +60,8 @@ struct StringCaseLess { } }; +std::size_t hash_of_path(const std::string& identifier, const std::string& path); + using StringCaseSet = std::set; using StringCaseUnorderedSet = std::unordered_set; template diff --git a/be/test/util/string_util_test.cpp b/be/test/util/string_util_test.cpp index d1fd99134eb49f..4bb29ef0e3d99c 100644 --- a/be/test/util/string_util_test.cpp +++ b/be/test/util/string_util_test.cpp @@ -69,6 +69,23 @@ TEST_F(StringUtilTest, normal) { ASSERT_EQ(345, test_map["abcE"]); ASSERT_EQ(0, test_map.count("ab")); } + { + std::string ip1 = "192.168.1.1"; + std::string ip2 = "192.168.1.2"; + int64_t hash1 = hash_of_path(ip1, "/home/disk1/palo2.HDD"); + int64_t hash2 = hash_of_path(ip1, "/home/disk1//palo2.HDD/"); + int64_t hash3 = hash_of_path(ip1, "home/disk1/palo2.HDD/"); + ASSERT_EQ(hash1, hash2); + ASSERT_EQ(hash3, hash2); + + int64_t hash4 = hash_of_path(ip1, "/home/disk1/palo2.HDD/"); + int64_t hash5 = hash_of_path(ip2, "/home/disk1/palo2.HDD/"); + ASSERT_NE(hash4, hash5); + + int64_t hash6 = hash_of_path(ip1, "/"); + int64_t hash7 = hash_of_path(ip1, "//"); + ASSERT_EQ(hash6, hash7); + } } } diff --git a/fe/src/main/java/org/apache/doris/backup/Repository.java b/fe/src/main/java/org/apache/doris/backup/Repository.java index 9444d92fe25e67..25acd0c1409cd1 100644 --- a/fe/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/src/main/java/org/apache/doris/backup/Repository.java @@ -17,6 +17,12 @@ package org.apache.doris.backup; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import org.apache.commons.codec.digest.DigestUtils; import org.apache.doris.backup.Status.ErrCode; import org.apache.doris.catalog.BrokerMgr.BrokerAddress; import org.apache.doris.catalog.Catalog; @@ -26,13 +32,6 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.system.Backend; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - -import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; diff --git a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java index dfaf41c51314b4..6613d027e59eb6 100644 --- a/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java +++ b/fe/src/main/java/org/apache/doris/catalog/DiskInfo.java @@ -39,6 +39,9 @@ public enum DiskState { private long diskAvailableCapacityB; private DiskState state; + // path hash is reported be Backend and no need to persist + private long pathHash; + private DiskInfo() { // for persist } @@ -49,6 +52,7 @@ public DiskInfo(String rootPath) { this.dataUsedCapacityB = 0; this.diskAvailableCapacityB = DEFAULT_CAPACITY_B; this.state = DiskState.ONLINE; + this.pathHash = -1; } public String getRootPath() { @@ -88,10 +92,19 @@ public void setState(DiskState state) { this.state = state; } + public long getPathHash() { + return pathHash; + } + + public void setPathHash(long pathHash) { + this.pathHash = pathHash; + } + @Override public String toString() { - return "DiskInfo [rootPath=" + rootPath + ", totalCapacityB=" + totalCapacityB + ", dataUsedCapacityB=" - + dataUsedCapacityB + ", diskAvailableCapacityB=" + diskAvailableCapacityB + ", state=" + state + "]"; + return "DiskInfo [rootPath=" + rootPath + "(" + pathHash + "), totalCapacityB=" + totalCapacityB + + ", dataUsedCapacityB=" + dataUsedCapacityB + ", diskAvailableCapacityB=" + + diskAvailableCapacityB + ", state=" + state + "]"; } @Override diff --git a/fe/src/main/java/org/apache/doris/catalog/Replica.java b/fe/src/main/java/org/apache/doris/catalog/Replica.java index 0d23d454a1e1be..314516db548431 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/src/main/java/org/apache/doris/catalog/Replica.java @@ -20,7 +20,6 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,6 +66,8 @@ public enum ReplicaStatus { private long lastSuccessVersionHash = 0L; private AtomicLong versionCount = new AtomicLong(-1); + + private long pathHash = -1; public Replica() { } @@ -150,6 +151,15 @@ public long getLastSuccessVersion() { public long getLastSuccessVersionHash() { return lastSuccessVersionHash; } + + public long getPathHash() { + return pathHash; + } + + public void setPathHash(long pathHash) { + this.pathHash = pathHash; + } + // only update data size and row num public synchronized void updateStat(long dataSize, long rowNum) { this.dataSize = dataSize; diff --git a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 68f5560db9c20d..661b44cd67adca 100644 --- a/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -17,6 +17,13 @@ package org.apache.doris.catalog; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Table; + import org.apache.doris.task.RecoverTabletTask; import org.apache.doris.thrift.TPartitionVersionInfo; import org.apache.doris.thrift.TStorageMedium; @@ -27,14 +34,6 @@ import org.apache.doris.transaction.TableCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashBasedTable; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Table; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -134,6 +133,13 @@ public void tabletReport(long backendId, Map backendTablets, tabletSyncMap.put(tabletMeta.getDbId(), tabletId); } + // check and set path + // path info of replica is only saved in Master FE + if (backendTabletInfo.isSetPath_hash() && + replica.getPathHash() != backendTabletInfo.getPath_hash()) { + replica.setPathHash(backendTabletInfo.getPath_hash()); + } + if (checkNeedRecover(replica, backendTabletInfo.getVersion(), backendTabletInfo.getVersion_hash())) { RecoverTabletTask recoverTabletTask = new RecoverTabletTask(backendId, diff --git a/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java b/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java index a9b48e1a692b85..0daf9cf0e5b0bd 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java +++ b/fe/src/main/java/org/apache/doris/common/proc/BackendProcNode.java @@ -17,17 +17,19 @@ package org.apache.doris.common.proc; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.system.Backend; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; public class BackendProcNode implements ProcNodeInterface { public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("RootPath").add("TotalCapacity").add("DataUsedCapacity").add("DiskAvailableCapacity").add("State") + .add("RootPath").add("TotalCapacity").add("DataUsedCapacity").add("DiskAvailableCapacity") + .add("State").add("PathHash") .build(); private Backend backend; @@ -46,7 +48,7 @@ public ProcResult fetchResult() throws AnalysisException { for (String infoString : backend.getDiskInfosAsString()) { String[] infos = infoString.split("\\|"); - Preconditions.checkState(infos.length == 5); + Preconditions.checkState(infos.length == 6); Pair totalUnitPair = DebugUtil.getByteUint(Long.valueOf(infos[1])); Pair dataUsedUnitPair = DebugUtil.getByteUint(Long.valueOf(infos[2])); @@ -60,7 +62,7 @@ public ProcResult fetchResult() throws AnalysisException { diskAvailableUnitPair.first) + " " + diskAvailableUnitPair.second; result.addRow(Lists.newArrayList(infos[0], readableTotalCapacity, readableDataUsedCapacity, - readableDiskAvailableCapacity, infos[4])); + readableDiskAvailableCapacity, infos[4], infos[5])); } long totalCapacityB = backend.getTotalCapacityB(); @@ -77,7 +79,7 @@ public ProcResult fetchResult() throws AnalysisException { String readableDiskAvailableCapacity = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(unitPair.first) + " " + unitPair.second; result.addRow(Lists.newArrayList("Total", readableTotalCapacity, readableDataUsedCapacity, - readableDiskAvailableCapacity, "")); + readableDiskAvailableCapacity, "", "")); return result; } diff --git a/fe/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java index 3b2ea0dadd807c..fe1679126defd0 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java @@ -17,19 +17,19 @@ package org.apache.doris.common.proc; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Tablet; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.system.Backend; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -46,7 +46,7 @@ public class TabletsProcDir implements ProcDirInterface { .add("VersionHash").add("LastSuccessVersion").add("LastSuccessVersionHash") .add("LastFailedVersion").add("LastFailedTime").add("DataSize").add("RowCount").add("State") .add("LastConsistencyCheckTime").add("CheckVersion").add("CheckVersionHash") - .add("VersionCount") + .add("VersionCount").add("PathHash") .build(); private Database db; @@ -86,6 +86,7 @@ public ProcResult fetchResult() { tabletInfo.add(-1); tabletInfo.add(-1); tabletInfo.add(-1); + tabletInfo.add(-1); tabletInfos.add(tabletInfo); } else { @@ -122,7 +123,8 @@ public ProcResult fetchResult() { tabletInfo.add(TimeUtils.longToTimeString(tablet.getLastCheckTime())); tabletInfo.add(tablet.getCheckedVersion()); tabletInfo.add(tablet.getCheckedVersionHash()); - tabletInfo.add(replica.getVersionCount()); + tabletInfo.add(replica.getVersionCount()); + tabletInfo.add(replica.getPathHash()); tabletInfos.add(tabletInfo); } diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 3932603ce67301..ddec1eead5d634 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -17,6 +17,15 @@ package org.apache.doris.load; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.gson.Gson; + +import org.apache.commons.lang.StringUtils; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.ColumnSeparator; @@ -86,18 +95,6 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionStatus; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.gson.Gson; - -import io.fabric8.kubernetes.api.model.extensions.Job; - -import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index 049490e90424ac..bea68c344d2328 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -17,6 +17,13 @@ package org.apache.doris.master; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; + +import org.apache.commons.lang.StringUtils; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -58,13 +65,6 @@ import org.apache.doris.thrift.TTablet; import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TTaskType; - -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Queues; - -import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -80,12 +80,15 @@ public class ReportHandler extends Daemon { private static final Logger LOG = LogManager.getLogger(ReportHandler.class); private BlockingQueue reportQueue = Queues.newLinkedBlockingQueue(); + public ReportHandler() { } + public TMasterResult handleReport(TReportRequest request) throws TException { TMasterResult result = new TMasterResult(); TStatus tStatus = new TStatus(TStatusCode.OK); result.setStatus(tStatus); + // get backend TBackend tBackend = request.getBackend(); String host = tBackend.getHost(); @@ -98,6 +101,7 @@ public TMasterResult handleReport(TReportRequest request) throws TException { tStatus.setError_msgs(errorMsgs); return result; } + long beId = backend.getId(); Map> tasks = null; Map disks = null; @@ -107,16 +111,24 @@ public TMasterResult handleReport(TReportRequest request) throws TException { if (request.isSetTasks()) { tasks = request.getTasks(); } + if (request.isSetDisks()) { disks = request.getDisks(); } + if (request.isSetTablets()) { tablets = request.getTablets(); reportVersion = request.getReport_version(); + } else if (request.isSetTablet_list()) { + // the 'tablets' member will be deprecated in future. + tablets = buildTabletMap(request.getTablet_list()); + reportVersion = request.getReport_version(); } + if (request.isSetForce_recovery()) { forceRecovery = request.isForce_recovery(); } + ReportTask reportTask = new ReportTask(beId, tasks, disks, tablets, reportVersion, forceRecovery); try { reportQueue.put(reportTask); @@ -128,16 +140,32 @@ public TMasterResult handleReport(TReportRequest request) throws TException { tStatus.setError_msgs(errorMsgs); return result; } + LOG.info("receive report from be {}. current queue size: {}", backend.getId(), reportQueue.size()); return result; } + + private Map buildTabletMap(List tabletList) { + Map tabletMap = Maps.newHashMap(); + for (TTablet tTablet : tabletList) { + if (tTablet.getTablet_infos().isEmpty()) { + continue; + } + + tabletMap.put(tTablet.getTablet_infos().get(0).getTablet_id(), tTablet); + } + return tabletMap; + } + private class ReportTask extends MasterTask { + private long beId; private Map> tasks; private Map disks; private Map tablets; private long reportVersion; private boolean forceRecovery = false; + public ReportTask(long beId, Map> tasks, Map disks, Map tablets, long reportVersion, @@ -149,6 +177,7 @@ public ReportTask(long beId, Map> tasks, this.reportVersion = reportVersion; this.forceRecovery = forceRecovery; } + @Override protected void exec() { if (tasks != null) { @@ -748,6 +777,7 @@ private static void addReplica(long tabletId, TTabletInfo backendTabletInfo, lon db.writeUnlock(); } } + @Override protected void runOneCycle() { while (true) { diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java b/fe/src/main/java/org/apache/doris/system/Backend.java index 46b96dee830201..547de81fd8f8b3 100644 --- a/fe/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/src/main/java/org/apache/doris/system/Backend.java @@ -17,6 +17,10 @@ package org.apache.doris.system; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.eventbus.EventBus; + import org.apache.doris.alter.DecommissionBackendJob.DecommissionType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.DiskInfo; @@ -27,11 +31,6 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.system.BackendEvent.BackendEventType; import org.apache.doris.thrift.TDisk; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.common.eventbus.EventBus; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -313,7 +312,7 @@ public List getDiskInfosAsString() { for (DiskInfo diskInfo : disks.values()) { diskInfoStrings.add(diskInfo.getRootPath() + "|" + diskInfo.getTotalCapacityB() + "|" + diskInfo.getDataUsedCapacityB() + "|" + diskInfo.getAvailableCapacityB() + "|" - + diskInfo.getState().name()); + + diskInfo.getState().name() + "|" + diskInfo.getPathHash()); } return diskInfoStrings; } @@ -352,6 +351,15 @@ public long getDataUsedCapacityB() { return dataUsedCapacityB; } + public String getPathByPathHash(long pathHash) { + for (DiskInfo diskInfo : disksRef.get().values()) { + if (diskInfo.getPathHash() == pathHash) { + return diskInfo.getRootPath(); + } + } + return null; + } + public void updateDisks(Map backendDisks) { // update status or add new diskInfo ImmutableMap disks = disksRef.get(); @@ -373,6 +381,10 @@ public void updateDisks(Map backendDisks) { diskInfo.setTotalCapacityB(totalCapacityB); diskInfo.setDataUsedCapacityB(dataUsedCapacityB); diskInfo.setAvailableCapacityB(diskAvailableCapacityB); + if (tDisk.isSetPath_hash()) { + diskInfo.setPathHash(tDisk.getPath_hash()); + } + if (isUsed) { diskInfo.setState(DiskState.ONLINE); } else { diff --git a/fe/src/test/java/org/apache/doris/common/proc/BackendProcNodeTest.java b/fe/src/test/java/org/apache/doris/common/proc/BackendProcNodeTest.java index d5e28a97cf68ef..370d985ab9f7e9 100644 --- a/fe/src/test/java/org/apache/doris/common/proc/BackendProcNodeTest.java +++ b/fe/src/test/java/org/apache/doris/common/proc/BackendProcNodeTest.java @@ -85,7 +85,7 @@ public void testResultNormal() throws AnalysisException { Assert.assertTrue(result.getRows().size() >= 1); Assert.assertEquals(Lists.newArrayList("RootPath", "TotalCapacity", "DataUsedCapacity", - "DiskAvailableCapacity", "State"), + "DiskAvailableCapacity", "State", "PathHash"), result.getColumnNames()); } diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift index 43fc594285c0c6..5c7aec4ab84641 100644 --- a/gensrc/thrift/MasterService.thrift +++ b/gensrc/thrift/MasterService.thrift @@ -33,6 +33,7 @@ struct TTabletInfo { 7: optional Types.TStorageMedium storage_medium 8: optional list transaction_ids 9: optional i64 version_count + 10: optional i64 path_hash } struct TFinishTaskRequest { @@ -62,6 +63,7 @@ struct TDisk { 3: required Types.TSize data_used_capacity 4: required bool used 5: optional Types.TSize disk_available_capacity + 6: optional i64 path_hash } struct TReportRequest { @@ -71,6 +73,7 @@ struct TReportRequest { 4: optional map tablets 5: optional map disks // string root_path 6: optional bool force_recovery + 7: optional list tablet_list } struct TMasterResult { diff --git a/run-ut.sh b/run-ut.sh index 9736799f0b3333..f56470f15c0aec 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -149,6 +149,7 @@ ${DORIS_TEST_BINARY_DIR}/util/json_util_test ${DORIS_TEST_BINARY_DIR}/util/byte_buffer_test2 ${DORIS_TEST_BINARY_DIR}/util/uid_util_test ${DORIS_TEST_BINARY_DIR}/util/aes_util_test +${DORIS_TEST_BINARY_DIR}/util/string_util_test ## Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test