Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
c806068
Refactor BE
yiguolei Dec 4, 2018
873217f
Fix bug that RowCusor do NOT match with RowBlock's layout
Jun 6, 2019
cb847d7
Fix conflict with master
Jun 10, 2019
b508a2f
Call delete old files when all dir convert successfully (#1278)
yiguolei Jun 11, 2019
6af0476
Check deserialize result when save meta (#1296)
yiguolei Jun 13, 2019
41eba09
Add copy function to support storage migration task (#1297)
Jun 13, 2019
9ebf3c1
Add migration lock when migration (#1298)
yiguolei Jun 13, 2019
46689e5
Remove data files using olap_header (#1301)
yiguolei Jun 13, 2019
f2b64a9
Use remove_all_dir to remote non-empty dir (#1313)
yiguolei Jun 14, 2019
ae9c52e
Delete old meta async (#1330)
yiguolei Jun 19, 2019
dbdeccc
Check version hash when full clone (#1331)
yiguolei Jun 19, 2019
00490d9
Place _init_seek_columns() in right place (#1339)
Jun 20, 2019
38f06e5
Make status be consistent with master
Jun 20, 2019
f1d3a71
Be refactor optimize meta tool new (#1337)
kangpinghuang Jun 20, 2019
2a6cbf4
Remove redundant _init_seek_columns()
Jun 20, 2019
8069199
Set version for base_compaction (#1347)
Jun 21, 2019
367e755
Set version of ReaderParams for CumulativeCompaction (#1351)
Jun 21, 2019
8b69c42
remove file created in segment group (#1352)
kangpinghuang Jun 21, 2019
3b97965
fix unused rowset bug (#1355)
kangpinghuang Jun 21, 2019
f1247be
Check get_meta status returned (#1363)
Jun 23, 2019
d7fd8dc
Release schema change lock when get migration lock failed (#1350)
yiguolei Jun 24, 2019
6a6a983
Fix dead lock bug in storage migration (#1371)
yiguolei Jun 24, 2019
ebfa860
add garbase collect by rowsetid (#1374)
kangpinghuang Jun 25, 2019
c916fae
Acquire rs readers at the beginning of the olapscanner (#1400)
yiguolei Jun 27, 2019
f1248a6
fix garbage collect bug (#1397)
kangpinghuang Jun 27, 2019
1ab004d
Fix gc coredump (#1402)
kangpinghuang Jul 1, 2019
15b032e
Check if there exist a rowset contains the added rowset (#1417)
yiguolei Jul 3, 2019
258d448
Fix tablet null pointer and wrong number of rows (#1423)
Jul 3, 2019
00318a0
Fix tablet null pointer and wrong number of rows (#1424)
Jul 3, 2019
0043926
Fix bug of generating non-sense segment when close ColumnDataWriter (…
Jul 8, 2019
2ee4f5f
Add RETURN_NOT_OK redefinition check
Jul 8, 2019
f3bb3ed
Remove unused imported package in FE (#1446)
Jul 9, 2019
92ee3df
Remove log in tablet management (#1448)
yiguolei Jul 9, 2019
34a16df
Add closedir() after opendir() (#1449)
Jul 9, 2019
1d28316
Fix memory leak in streaming load (#1462)
Jul 11, 2019
597cf3e
Fix bug when doing linked schema change. (#1475)
Jul 15, 2019
3d8bdaa
Add log for clone rowset
Jul 15, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ set(CXX_GCC_FLAGS "-g -Wno-unused-local-typedefs")
# Debug information is stored as dwarf2 to be as compatible as possible
# -Werror: compile warnings should be errors when using the toolchain compiler.
# Only enable for debug builds because this is what we test in pre-commit tests.
set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Werror -ggdb")
set(CXX_FLAGS_DEBUG "${CXX_GCC_FLAGS} -Werror -ggdb -O0 -gdwarf-2")

# For CMAKE_BUILD_TYPE=Release
# -O3: Enable all compiler optimizations
Expand Down Expand Up @@ -476,6 +476,7 @@ set(DORIS_LINK_LIBS
Exprs
Gutil
Olap
Rowset
Runtime
Service
Udf
Expand Down Expand Up @@ -604,8 +605,9 @@ add_subdirectory(${SRC_DIR}/olap)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/service)
add_subdirectory(${SRC_DIR}/testutil)
add_subdirectory(${SRC_DIR}/tools)
#add_subdirectory(${SRC_DIR}/tools)
add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/tools)
add_subdirectory(${SRC_DIR}/udf_samples)
add_subdirectory(${SRC_DIR}/util)

Expand Down
5 changes: 1 addition & 4 deletions be/src/agent/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/agent")

add_library(Agent STATIC
agent_server.cpp
pusher.cpp
heartbeat_server.cpp
task_worker_pool.cpp
utils.cpp
cgroups_mgr.cpp
topic_subscriber.cpp
user_resource_listener.cpp
)


)
60 changes: 23 additions & 37 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "gen_cpp/MasterService_types.h"
#include "gen_cpp/Status_types.h"
#include "olap/utils.h"
#include "olap/snapshot_manager.h"
#include "runtime/exec_env.h"
#include "runtime/etl_job_mgr.h"
#include "util/debug_util.h"
Expand Down Expand Up @@ -71,11 +72,11 @@ AgentServer::AgentServer(ExecEnv* exec_env,
}

// init task worker pool
_create_table_workers = new TaskWorkerPool(
_create_tablet_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::CREATE_TABLE,
_exec_env,
master_info);
_drop_table_workers = new TaskWorkerPool(
_drop_tablet_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::DROP_TABLE,
_exec_env,
master_info);
Expand All @@ -99,7 +100,7 @@ AgentServer::AgentServer(ExecEnv* exec_env,
TaskWorkerPool::TaskWorkerType::DELETE,
_exec_env,
master_info);
_alter_table_workers = new TaskWorkerPool(
_alter_tablet_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::ALTER_TABLE,
_exec_env,
master_info);
Expand All @@ -111,10 +112,6 @@ AgentServer::AgentServer(ExecEnv* exec_env,
TaskWorkerPool::TaskWorkerType::STORAGE_MEDIUM_MIGRATE,
_exec_env,
master_info);
_cancel_delete_data_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::CANCEL_DELETE_DATA,
_exec_env,
master_info);
_check_consistency_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::CHECK_CONSISTENCY,
_exec_env,
Expand All @@ -127,7 +124,7 @@ AgentServer::AgentServer(ExecEnv* exec_env,
TaskWorkerPool::TaskWorkerType::REPORT_DISK_STATE,
_exec_env,
master_info);
_report_olap_table_workers = new TaskWorkerPool(
_report_tablet_workers = new TaskWorkerPool(
TaskWorkerPool::TaskWorkerType::REPORT_OLAP_TABLE,
_exec_env,
master_info);
Expand Down Expand Up @@ -156,21 +153,20 @@ AgentServer::AgentServer(ExecEnv* exec_env,
_exec_env,
master_info);
#ifndef BE_TEST
_create_table_workers->start();
_drop_table_workers->start();
_create_tablet_workers->start();
_drop_tablet_workers->start();
_push_workers->start();
_publish_version_workers->start();
_clear_alter_task_workers->start();
_clear_transaction_task_workers->start();
_delete_workers->start();
_alter_table_workers->start();
_alter_tablet_workers->start();
_clone_workers->start();
_storage_medium_migrate_workers->start();
_cancel_delete_data_workers->start();
_check_consistency_workers->start();
_report_task_workers->start();
_report_disk_state_workers->start();
_report_olap_table_workers->start();
_report_tablet_workers->start();
_upload_workers->start();
_download_workers->start();
_make_snapshot_workers->start();
Expand All @@ -185,11 +181,11 @@ AgentServer::AgentServer(ExecEnv* exec_env,
}

AgentServer::~AgentServer() {
if (_create_table_workers != NULL) {
delete _create_table_workers;
if (_create_tablet_workers != NULL) {
delete _create_tablet_workers;
}
if (_drop_table_workers != NULL) {
delete _drop_table_workers;
if (_drop_tablet_workers != NULL) {
delete _drop_tablet_workers;
}
if (_push_workers != NULL) {
delete _push_workers;
Expand All @@ -206,18 +202,15 @@ AgentServer::~AgentServer() {
if (_delete_workers != NULL) {
delete _delete_workers;
}
if (_alter_table_workers != NULL) {
delete _alter_table_workers;
if (_alter_tablet_workers != NULL) {
delete _alter_tablet_workers;
}
if (_clone_workers != NULL) {
delete _clone_workers;
}
if (_storage_medium_migrate_workers != NULL) {
delete _storage_medium_migrate_workers;
}
if (_cancel_delete_data_workers != NULL) {
delete _cancel_delete_data_workers;
}
if (_check_consistency_workers != NULL) {
delete _check_consistency_workers;
}
Expand All @@ -227,8 +220,8 @@ AgentServer::~AgentServer() {
if (_report_disk_state_workers != NULL) {
delete _report_disk_state_workers;
}
if (_report_olap_table_workers != NULL) {
delete _report_olap_table_workers;
if (_report_tablet_workers != NULL) {
delete _report_tablet_workers;
}
if (_upload_workers != NULL) {
delete _upload_workers;
Expand Down Expand Up @@ -277,14 +270,14 @@ void AgentServer::submit_tasks(
switch (task_type) {
case TTaskType::CREATE:
if (task.__isset.create_tablet_req) {
_create_table_workers->submit_task(task);
_create_tablet_workers->submit_task(task);
} else {
status_code = TStatusCode::ANALYSIS_ERROR;
}
break;
case TTaskType::DROP:
if (task.__isset.drop_tablet_req) {
_drop_table_workers->submit_task(task);
_drop_tablet_workers->submit_task(task);
} else {
status_code = TStatusCode::ANALYSIS_ERROR;
}
Expand Down Expand Up @@ -328,7 +321,7 @@ void AgentServer::submit_tasks(
case TTaskType::ROLLUP:
case TTaskType::SCHEMA_CHANGE:
if (task.__isset.alter_tablet_req) {
_alter_table_workers->submit_task(task);
_alter_tablet_workers->submit_task(task);
} else {
status_code = TStatusCode::ANALYSIS_ERROR;
}
Expand All @@ -347,13 +340,6 @@ void AgentServer::submit_tasks(
status_code = TStatusCode::ANALYSIS_ERROR;
}
break;
case TTaskType::CANCEL_DELETE:
if (task.__isset.cancel_delete_data_req) {
_cancel_delete_data_workers->submit_task(task);
} else {
status_code = TStatusCode::ANALYSIS_ERROR;
}
break;
case TTaskType::CHECK_CONSISTENCY:
if (task.__isset.check_consistency_req) {
_check_consistency_workers->submit_task(task);
Expand Down Expand Up @@ -423,10 +409,10 @@ void AgentServer::make_snapshot(TAgentResult& return_value,
TStatus status;
vector<string> error_msgs;
TStatusCode::type status_code = TStatusCode::OK;

return_value.__set_snapshot_version(PREFERRED_SNAPSHOT_VERSION);
string snapshot_path;
OLAPStatus make_snapshot_status =
_exec_env->olap_engine()->make_snapshot(snapshot_request, &snapshot_path);
SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path);
if (make_snapshot_status != OLAP_SUCCESS) {
status_code = TStatusCode::RUNTIME_ERROR;
OLAP_LOG_WARNING("make_snapshot failed. tablet_id: %ld, schema_hash: %ld, status: %d",
Expand All @@ -453,7 +439,7 @@ void AgentServer::release_snapshot(TAgentResult& return_value, const std::string
TStatusCode::type status_code = TStatusCode::OK;

OLAPStatus release_snapshot_status =
_exec_env->olap_engine()->release_snapshot(snapshot_path);
SnapshotManager::instance()->release_snapshot(snapshot_path);
if (release_snapshot_status != OLAP_SUCCESS) {
status_code = TStatusCode::RUNTIME_ERROR;
LOG(WARNING) << "release_snapshot failed. snapshot_path: " << snapshot_path << ", status: " << release_snapshot_status;
Expand Down
9 changes: 4 additions & 5 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,20 @@ class AgentServer {
ExecEnv* _exec_env;
const TMasterInfo& _master_info;

TaskWorkerPool* _create_table_workers;
TaskWorkerPool* _drop_table_workers;
TaskWorkerPool* _create_tablet_workers;
TaskWorkerPool* _drop_tablet_workers;
TaskWorkerPool* _push_workers;
TaskWorkerPool* _publish_version_workers;
TaskWorkerPool* _clear_alter_task_workers;
TaskWorkerPool* _clear_transaction_task_workers;
TaskWorkerPool* _delete_workers;
TaskWorkerPool* _alter_table_workers;
TaskWorkerPool* _alter_tablet_workers;
TaskWorkerPool* _clone_workers;
TaskWorkerPool* _storage_medium_migrate_workers;
TaskWorkerPool* _cancel_delete_data_workers;
TaskWorkerPool* _check_consistency_workers;
TaskWorkerPool* _report_task_workers;
TaskWorkerPool* _report_disk_state_workers;
TaskWorkerPool* _report_olap_table_workers;
TaskWorkerPool* _report_tablet_workers;
TaskWorkerPool* _upload_workers;
TaskWorkerPool* _download_workers;
TaskWorkerPool* _make_snapshot_workers;
Expand Down
6 changes: 3 additions & 3 deletions be/src/agent/cgroups_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
#include <sys/vfs.h>
#include "boost/filesystem.hpp"
#include "common/logging.h"
#include "olap/store.h"
#include "olap/olap_engine.h"
#include "olap/data_dir.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "runtime/load_path_mgr.h"

Expand Down Expand Up @@ -188,7 +188,7 @@ AgentStatus CgroupsMgr::_config_disk_throttle(std::string user_name,
}

// add olap engine data path here
auto stores = OLAPEngine::get_instance()->get_stores();
auto stores = StorageEngine::instance()->get_stores();
// buld load data path, it is alreay in data path
// _exec_env->load_path_mgr()->get_load_data_path(&data_paths);

Expand Down
4 changes: 2 additions & 2 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "common/status.h"
#include "gen_cpp/HeartbeatService.h"
#include "gen_cpp/Status_types.h"
#include "olap/olap_engine.h"
#include "olap/storage_engine.h"
#include "olap/utils.h"
#include "service/backend_options.h"
#include "util/thrift_server.h"
Expand All @@ -41,7 +41,7 @@ namespace doris {
HeartbeatServer::HeartbeatServer(TMasterInfo* master_info) :
_master_info(master_info),
_epoch(0) {
_olap_engine = OLAPEngine::get_instance();
_olap_engine = StorageEngine::instance();
}

void HeartbeatServer::init_cluster_id() {
Expand Down
4 changes: 2 additions & 2 deletions be/src/agent/heartbeat_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
namespace doris {

const uint32_t HEARTBEAT_INTERVAL = 10;
class OLAPEngine;
class StorageEngine;
class Status;
class ThriftServer;

Expand All @@ -55,7 +55,7 @@ class HeartbeatServer : public HeartbeatServiceIf {
Status _heartbeat(
const TMasterInfo& master_info);

OLAPEngine* _olap_engine;
StorageEngine* _olap_engine;

// mutex to protect master_info and _epoch
std::mutex _hb_mtx;
Expand Down
Loading