Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void HeartbeatServer::heartbeat(
<< "counter:" << google::COUNTER;

// do heartbeat
Status st = _heartbeat(master_info);
Status st = _heartbeat(master_info);
st.to_thrift(&heartbeat_result.status);

if (st.ok()) {
Expand All @@ -73,6 +73,8 @@ void HeartbeatServer::heartbeat(

Status HeartbeatServer::_heartbeat(
const TMasterInfo& master_info) {

std::lock_guard<std::mutex> lk(_hb_mtx);

if (master_info.__isset.backend_ip) {
if (master_info.backend_ip != BackendOptions::get_localhost()) {
Expand Down Expand Up @@ -104,12 +106,14 @@ Status HeartbeatServer::_heartbeat(
}
}

bool need_report = false;
if (_master_info->network_address.hostname != master_info.network_address.hostname
|| _master_info->network_address.port != master_info.network_address.port) {
if (master_info.epoch > _epoch) {
_master_info->network_address.hostname = master_info.network_address.hostname;
_master_info->network_address.port = master_info.network_address.port;
_epoch = master_info.epoch;
need_report = true;
LOG(INFO) << "master change. new master host: " << _master_info->network_address.hostname
<< ". port: " << _master_info->network_address.port << ". epoch: " << _epoch;
} else {
Expand All @@ -119,6 +123,13 @@ Status HeartbeatServer::_heartbeat(
<< " local epoch: " << _epoch << " received epoch: " << master_info.epoch;
return Status("epoch is not greater than local. ignore heartbeat.");
}
} else {
// when Master FE restarted, host and port remains the same, but epoch will be increased.
if (master_info.epoch > _epoch) {
_epoch = master_info.epoch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function may be called concurrently. and changing _epoch and testing _epoch without any protect would lead to strange result

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a mutex to protect _heartbeat() function

need_report = true;
LOG(INFO) << "master restarted. epoch: " << _epoch;
}
}

if (master_info.__isset.token) {
Expand All @@ -132,6 +143,11 @@ Status HeartbeatServer::_heartbeat(
}
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_olap_engine->report_notify(true);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. notify_all needn't to hold mutex first.
  2. report_cv.notify_all() should encapsulate in a function
  3. you can use _olap_engine instead of OLAPEngine::get_instance()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


return Status::OK;
}

Expand Down
10 changes: 8 additions & 2 deletions be/src/agent/heartbeat_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#ifndef DORIS_BE_SRC_AGENT_HEARTBEAT_SERVER_H
#define DORIS_BE_SRC_AGENT_HEARTBEAT_SERVER_H

#include <mutex>

#include "thrift/transport/TTransportUtils.h"

#include "agent/status.h"
Expand Down Expand Up @@ -48,14 +50,18 @@ class HeartbeatServer : public HeartbeatServiceIf {
// Output parameters:
// * heartbeat_result: The result of heartbeat set
virtual void heartbeat(THeartbeatResult& heartbeat_result, const TMasterInfo& master_info);
private:

private:
Status _heartbeat(
const TMasterInfo& master_info);

TMasterInfo* _master_info;
OLAPEngine* _olap_engine;

// mutex to protect master_info and _epoch
std::mutex _hb_mtx;
TMasterInfo* _master_info;
int64_t _epoch;

DISALLOW_COPY_AND_ASSIGN(HeartbeatServer);
}; // class HeartBeatServer

Expand Down
45 changes: 12 additions & 33 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <pthread.h>
#include <sys/stat.h>
#include <atomic>
#include <chrono>
#include <csignal>
#include <ctime>
#include <fstream>
Expand All @@ -27,6 +28,7 @@
#include <sstream>
#include <string>
#include <sys/stat.h>

#include "boost/filesystem.hpp"
#include "boost/lexical_cast.hpp"
#include "agent/pusher.h"
Expand Down Expand Up @@ -80,8 +82,6 @@ map<TTaskType::type, map<string, uint32_t>> TaskWorkerPool::_s_running_task_user
map<TTaskType::type, map<string, uint32_t>> TaskWorkerPool::_s_total_task_user_count;
map<TTaskType::type, uint32_t> TaskWorkerPool::_s_total_task_count;
FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache;
std::mutex TaskWorkerPool::_disk_broken_lock;
std::chrono::seconds TaskWorkerPool::_wait_duration;

TaskWorkerPool::TaskWorkerPool(
const TaskWorkerType task_worker_type,
Expand Down Expand Up @@ -167,12 +167,10 @@ void TaskWorkerPool::start() {
_callback_function = _report_task_worker_thread_callback;
break;
case TaskWorkerType::REPORT_DISK_STATE:
_wait_duration = std::chrono::seconds(config::report_disk_state_interval_seconds);
_worker_count = REPORT_DISK_STATE_WORKER_COUNT;
_callback_function = _report_disk_state_worker_thread_callback;
break;
case TaskWorkerType::REPORT_OLAP_TABLE:
_wait_duration = std::chrono::seconds(config::report_disk_state_interval_seconds);
_worker_count = REPORT_OLAP_TABLE_WORKER_COUNT;
_callback_function = _report_olap_table_worker_thread_callback;
break;
Expand Down Expand Up @@ -1825,15 +1823,14 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
continue;
}
#endif

vector<RootPathInfo> root_paths_info;

worker_pool_this->_env->olap_engine()->get_all_root_path_info(&root_paths_info);

map<string, TDisk> disks;
for (auto root_path_info : root_paths_info) {
TDisk disk;
disk.__set_root_path(root_path_info.path);
disk.__set_path_hash(root_path_info.path_hash);
disk.__set_disk_total_capacity(static_cast<double>(root_path_info.capacity));
disk.__set_data_used_capacity(static_cast<double>(root_path_info.data_used_capacity));
disk.__set_disk_available_capacity(static_cast<double>(root_path_info.available));
Expand All @@ -1854,16 +1851,9 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
}

#ifndef BE_TEST
{
// wait disk_broken_cv awaken
// if awaken, set is_report_disk_state_already to true, it will not notify again
// if overtime, while will go to next cycle
std::unique_lock<std::mutex> lk(_disk_broken_lock);
auto cv_status = OLAPEngine::get_instance()->disk_broken_cv.wait_for(lk, _wait_duration);
if (cv_status == std::cv_status::no_timeout) {
OLAPEngine::get_instance()->is_report_disk_state_already = true;
}
}
// wait for notifying until timeout
OLAPEngine::get_instance()->wait_for_report_notify(
config::report_disk_state_interval_seconds, false);
}
#endif

Expand All @@ -1889,7 +1879,6 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
continue;
}
#endif

request.tablets.clear();

request.__set_report_version(_s_report_version);
Expand All @@ -1899,14 +1888,9 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
OLAP_LOG_WARNING("report get all tablets info failed. status: %d",
report_all_tablets_info_status);
#ifndef BE_TEST
// wait disk_broken_cv awaken
// if awaken, set is_report_olap_table_already to true, it will not notify again
// if overtime, while will go to next cycle
std::unique_lock<std::mutex> lk(_disk_broken_lock);
auto cv_status = OLAPEngine::get_instance()->disk_broken_cv.wait_for(lk, _wait_duration);
if (cv_status == std::cv_status::no_timeout) {
OLAPEngine::get_instance()->is_report_olap_table_already = true;
}
// wait for notifying until timeout
OLAPEngine::get_instance()->wait_for_report_notify(
config::report_olap_table_interval_seconds, true);
continue;
#else
return (void*)0;
Expand All @@ -1924,14 +1908,9 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
}

#ifndef BE_TEST
// wait disk_broken_cv awaken
// if awaken, set is_report_olap_table_already to true, it will not notify again
// if overtime, while will go to next cycle
std::unique_lock<std::mutex> lk(_disk_broken_lock);
auto cv_status = OLAPEngine::get_instance()->disk_broken_cv.wait_for(lk, _wait_duration);
if (cv_status == std::cv_status::no_timeout) {
OLAPEngine::get_instance()->is_report_olap_table_already = true;
}
// wait for notifying until timeout
OLAPEngine::get_instance()->wait_for_report_notify(
config::report_olap_table_interval_seconds, true);
}
#endif

Expand Down
3 changes: 0 additions & 3 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,6 @@ class TaskWorkerPool {
static Mutex _s_running_task_user_count_lock;
static FrontendServiceClientCache _master_service_client_cache;

static std::mutex _disk_broken_lock;
static std::chrono::seconds _wait_duration;

DISALLOW_COPY_AND_ASSIGN(TaskWorkerPool);
}; // class TaskWorkerPool
} // namespace doris
Expand Down
18 changes: 10 additions & 8 deletions be/src/olap/olap_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ bool _sort_table_by_create_time(const OLAPTablePtr& a, const OLAPTablePtr& b) {

static Status _validate_options(const EngineOptions& options) {
if (options.store_paths.empty()) {
return Status("sotre paths is empty");;
return Status("store paths is empty");;
}
return Status::OK;
}
Expand All @@ -107,17 +107,17 @@ Status OLAPEngine::open(const EngineOptions& options, OLAPEngine** engine_ptr) {
}

OLAPEngine::OLAPEngine(const EngineOptions& options)
: is_report_disk_state_already(false),
is_report_olap_table_already(false),
_options(options),
: _options(options),
_available_storage_medium_type_count(0),
_effective_cluster_id(-1),
_is_all_cluster_id_exist(true),
_is_drop_tables(false),
_global_table_id(0),
_index_stream_lru_cache(NULL),
_tablet_stat_cache_update_time_ms(0),
_snapshot_base_id(0) {
_snapshot_base_id(0),
_is_report_disk_state_already(false),
_is_report_olap_table_already(false) {
if (_s_instance == nullptr) {
_s_instance = this;
}
Expand Down Expand Up @@ -561,14 +561,14 @@ void OLAPEngine::start_disk_stat_monitor() {
// if drop tables
// notify disk_state_worker_thread and olap_table_worker_thread until they received
if (_is_drop_tables) {
disk_broken_cv.notify_all();
report_notify(true);

bool is_report_disk_state_expected = true;
bool is_report_olap_table_expected = true;
bool is_report_disk_state_exchanged =
is_report_disk_state_already.compare_exchange_strong(is_report_disk_state_expected, false);
_is_report_disk_state_already.compare_exchange_strong(is_report_disk_state_expected, false);
bool is_report_olap_table_exchanged =
is_report_olap_table_already.compare_exchange_strong(is_report_olap_table_expected, false);
_is_report_olap_table_already.compare_exchange_strong(is_report_olap_table_expected, false);
if (is_report_disk_state_exchanged && is_report_olap_table_exchanged) {
_is_drop_tables = false;
}
Expand Down Expand Up @@ -1675,6 +1675,8 @@ OLAPStatus OLAPEngine::report_all_tablets_info(std::map<TTabletId, TTablet>* tab
}

tablet_info.__set_version_count(olap_table->file_delta_size());
tablet_info.__set_path_hash(olap_table->store()->path_hash());

tablet.tablet_infos.push_back(tablet_info);
}

Expand Down
26 changes: 22 additions & 4 deletions be/src/olap/olap_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct RootPathInfo {
is_used(false) { }

std::string path;
int64_t path_hash;
int64_t capacity; // 总空间,单位字节
int64_t available; // 可用空间,单位字节
int64_t data_used_capacity;
Expand Down Expand Up @@ -206,10 +207,6 @@ class OLAPEngine {
// 清理trash和snapshot文件,返回清理后的磁盘使用量
OLAPStatus start_trash_sweep(double *usage);

std::condition_variable disk_broken_cv;
std::atomic_bool is_report_disk_state_already;
std::atomic_bool is_report_olap_table_already;

template<bool include_unused = false>
std::vector<OlapStore*> get_stores();
Status set_cluster_id(int32_t cluster_id);
Expand Down Expand Up @@ -357,6 +354,21 @@ class OLAPEngine {
const TPushReq& request,
std::vector<TTabletInfo>* tablet_info_vec);

// call this if you want to trigger a disk and tablet report
void report_notify(bool is_all) {
is_all ? _report_cv.notify_all() : _report_cv.notify_one();
}

// call this to wait a report notification until timeout
void wait_for_report_notify(int64_t timeout_sec, bool is_tablet_report) {
std::unique_lock<std::mutex> lk(_report_mtx);
auto cv_status = _report_cv.wait_for(lk, std::chrono::seconds(timeout_sec));
if (cv_status == std::cv_status::no_timeout) {
is_tablet_report ? _is_report_olap_table_already = true :
_is_report_disk_state_already = true;
}
}

private:
OLAPStatus check_all_root_path_cluster_id();

Expand Down Expand Up @@ -595,6 +607,12 @@ class OLAPEngine {
std::thread _fd_cache_clean_thread;

static atomic_t _s_request_number;

// for tablet and disk report
std::mutex _report_mtx;
std::condition_variable _report_cv;
std::atomic_bool _is_report_disk_state_already;
std::atomic_bool _is_report_olap_table_already;
};

} // namespace doris
Expand Down
21 changes: 10 additions & 11 deletions be/src/olap/store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
#include "olap/file_helper.h"
#include "olap/olap_define.h"
#include "olap/utils.h" // for check_dir_existed
#include "service/backend_options.h"
#include "util/file_utils.h"
#include "util/string_util.h"
#include "olap/olap_header_manager.h"

namespace doris {
Expand Down Expand Up @@ -93,7 +95,6 @@ Status OlapStore::load() {
RETURN_IF_ERROR(_init_cluster_id());
RETURN_IF_ERROR(_init_extension_and_capacity());
RETURN_IF_ERROR(_init_file_system());

RETURN_IF_ERROR(_init_meta());

_is_used = true;
Expand Down Expand Up @@ -275,6 +276,11 @@ Status OlapStore::_init_file_system() {
}

Status OlapStore::_init_meta() {
// init path hash
_path_hash = hash_of_path(BackendOptions::get_localhost(), _path);
LOG(INFO) << "get hash of path: " << _path
<< ": " << _path_hash;

// init meta
_meta = new(std::nothrow) OlapMeta(_path);
if (_meta == nullptr) {
Expand Down Expand Up @@ -581,17 +587,10 @@ OLAPStatus OlapStore::_check_none_row_oriented_table_in_store(
}
// init must be called
RETURN_NOT_OK(olap_header->init());
OLAPTablePtr olap_table =
OLAPTable::create_from_header(olap_header.release());
if (olap_table == nullptr) {
LOG(WARNING) << "fail to new table. tablet_id=" << tablet_id << ", schema_hash:" << schema_hash;
return OLAP_ERR_TABLE_CREATE_FROM_HEADER_ERROR;
}

LOG(INFO) << "data_file_type:" << olap_table->data_file_type();
if (olap_table->data_file_type() == OLAP_DATA_FILE) {
LOG(INFO) << "data_file_type:" << olap_header->data_file_type();
if (olap_header->data_file_type() == OLAP_DATA_FILE) {
LOG(FATAL) << "Not support row-oriented table any more. Please convert it to column-oriented table."
<< "tablet=" << olap_table->full_name();
<< "tablet=" << tablet_id << "." << schema_hash;
}

return OLAP_SUCCESS;
Expand Down
Loading