Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c96e047
merge to bc456abcfbda9cf3ef4aeb1989f0accc8b01b786
morningman-cmy Jun 22, 2018
7d9960e
merge to 9f16989b3dd9aa46baf601580aa343e3128adccc produce uncorrect T…
morningman-cmy Jun 28, 2018
f0265ac
merge to c9f1f1b7d3ac73c8f0b88a699186c03e24d5ad31 remove HyperTable rpc
morningman-cmy Jul 16, 2018
3b60018
merge to 56dad2a21a28613c8384923f39a280109dab1a44 copy exprs result i…
morningman-cmy Jul 16, 2018
1a4eda8
merge to b5f11211e67c470b616d3465dd15fdeddffbb8f2 fix remove useful c…
morningman-cmy Jul 16, 2018
9c093d2
merge to 36dc58d4680901ce51a1d4486c5467ecea484843 NULL_VALUE can't tr…
morningman-cmy Jul 16, 2018
53986f5
merge to f3ee363dc50fe30c529cf6f160779641563e35ee spelling error
morningman-cmy Jul 16, 2018
a2207be
merge to d1e08f75e0f50b101090cceacab1611c745ddc5a new on specfied mem…
morningman-cmy Jul 16, 2018
e369da1
merge to 1183c0a0b0e3a7ce8df416163a6ec14d642193a8 remove unused mater…
morningman-cmy Jul 16, 2018
8f7b37d
merge to 52c2e44dcf9f4048e4cf3255b5a62c59a2526995 fix file descriptor…
morningman-cmy Jul 16, 2018
94cdca4
merge to c8382d9de789c709ffbcb51387c3d2c2ba10bbbb change fe to be's r…
morningman-cmy Jul 16, 2018
b92fcd0
merge to 419cc0c33062a85b45b2a9a7acc10f5a92ad26b2 normalize materiali…
morningman-cmy Jul 16, 2018
97c0b88
merge to edb3aac9269c8e74a8654011b1a9fafda336eede compile thirdparty …
morningman-cmy Jul 16, 2018
749c740
merge to Bug fix: crash when fetch no result
morningman-cmy Jul 16, 2018
2a97204
merge to e3f0902aede8d22199ec9a6e0567cc69eb226f80 get backend dropped…
morningman-cmy Jul 16, 2018
1db06ac
merge to 9ab2766c2be5aef2a99c8c9d77da0582d86b8828 backend may be drop…
morningman-cmy Jul 16, 2018
69d0968
merge to 5a434208cd80e8676970a77f4376b9e4a27d8bbb hdfs broker support…
morningman-cmy Jul 16, 2018
804303c
merge to 8a33d8e93851b00b79f17d7f5e55998080bf3d4b add metric to count…
morningman-cmy Jul 16, 2018
7eaf438
merge to 47535bbb75df43ef4aa08cefb023a17379738ef8 remove rpc/ in Cmak…
morningman-cmy Jul 16, 2018
f89be4a
merge to 47535bbb75df43ef4aa08cefb023a17379738ef8 add missing fe rpc …
morningman-cmy Jul 16, 2018
fb904f7
merge to 9625ef157dd44c58802d63cb7547f037b75fd710 modify libevent dow…
morningman-cmy Jul 16, 2018
3c24a10
merge to 9625ef157dd44c58802d63cb7547f037b75fd710 make thirdparty com…
morningman Jul 16, 2018
c27781f
merge to 9625ef157dd44c58802d63cb7547f037b75fd710 add java libs
morningman-cmy Jul 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ set(OUTPUT_DIR "${BASE_DIR}/output")
if (DEFINED ENV{PALO_LLVM_HOME})
set(LLVM_HOME "$ENV{PALO_LLVM_HOME}")
else()
set(LLVM_HOME "${THIRDPARTY_DIR}")
set(LLVM_HOME "${THIRDPARTY_DIR}/llvm")
endif()
set(LLVM_BIN "${LLVM_HOME}/bin")

Expand Down Expand Up @@ -102,7 +102,7 @@ find_package(Boost 1.55.0 REQUIRED COMPONENTS thread regex filesystem system dat
include_directories(${Boost_INCLUDE_DIRS})
message(STATUS ${Boost_LIBRARIES})

set(GPERFTOOLS_HOME "${THIRDPARTY_DIR}")
set(GPERFTOOLS_HOME "${THIRDPARTY_DIR}/gperftools")

# Set all libraries
add_library(gflags STATIC IMPORTED)
Expand Down Expand Up @@ -256,7 +256,7 @@ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -std=gnu++11")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated -Wno-vla")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DBOOST_SYSTEM_NO_DEPRECATED")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse4.2 -D_GLIBCXX_USE_CXX11_ABI=0")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse4.2")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DLLVM_ON_UNIX")

# for bprc
Expand Down Expand Up @@ -443,7 +443,6 @@ set(PALO_LINK_LIBS
Gutil
Olap
Runtime
RPC
Service
Udf
Util
Expand Down Expand Up @@ -539,7 +538,6 @@ add_subdirectory(${SRC_DIR}/exprs)
add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/runtime)
add_subdirectory(${SRC_DIR}/testutil)
add_subdirectory(${SRC_DIR}/rpc)

# Utility CMake function to make specifying tests and benchmarks less verbose
FUNCTION(ADD_BE_TEST TEST_NAME)
Expand Down
2 changes: 1 addition & 1 deletion be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void HeartbeatServer::heartbeat(
if (status == PALO_SUCCESS) {
backend_info.__set_be_port(config::be_port);
backend_info.__set_http_port(config::webserver_port);
backend_info.__set_be_rpc_port(config::be_rpc_port);
backend_info.__set_be_rpc_port(-1);
backend_info.__set_brpc_port(config::brpc_port);
} else {
status_code = TStatusCode::RUNTIME_ERROR;
Expand Down
14 changes: 13 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "common/resource_tls.h"
#include "agent/cgroups_mgr.h"
#include "service/backend_options.h"
#include "util/palo_metrics.h"

using std::deque;
using std::list;
Expand All @@ -60,7 +61,7 @@ const uint32_t REPORT_DISK_STATE_WORKER_COUNT = 1;
const uint32_t REPORT_OLAP_TABLE_WORKER_COUNT = 1;
const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?";
const std::string HTTP_REQUEST_TOKEN_PARAM = "&token=";
const std::string HTTP_REQUEST_TOKEN_PARAM = "token=";
const std::string HTTP_REQUEST_FILE_PARAM = "&file=";

std::atomic_ulong TaskWorkerPool::_s_report_version(time(NULL) * 10000);
Expand Down Expand Up @@ -284,12 +285,14 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request)
int32_t try_time = 0;

while (try_time < TASK_FINISH_MAX_RETRY) {
PaloMetrics::finish_task_requests_total.increment(1);
AgentStatus client_status = _master_client->finish_task(finish_task_request, &result);

if (client_status == PALO_SUCCESS) {
OLAP_LOG_INFO("finish task success.result: %d", result.status.status_code);
break;
} else {
PaloMetrics::finish_task_requests_failed.increment(1);
OLAP_LOG_WARNING("finish task failed.result: %d", result.status.status_code);
try_time += 1;
}
Expand Down Expand Up @@ -818,6 +821,8 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {
clone_req = agent_task_req.clone_req;
worker_pool_this->_tasks.pop_front();
}

PaloMetrics::clone_requests_total.increment(1);
// Try to register to cgroups_mgr
CgroupsMgr::apply_system_cgroup();
OLAP_LOG_INFO("get clone task. signature: %ld", agent_task_req.signature);
Expand Down Expand Up @@ -971,6 +976,7 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) {

TStatusCode::type status_code = TStatusCode::OK;
if (status != PALO_SUCCESS && status != PALO_CREATE_TABLE_EXIST) {
PaloMetrics::clone_requests_failed.increment(1);
status_code = TStatusCode::RUNTIME_ERROR;
OLAP_LOG_WARNING("clone failed. signature: %ld",
agent_task_req.signature);
Expand Down Expand Up @@ -1506,13 +1512,16 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) {
OLAP_LOG_INFO("master host: %s, port: %d",
worker_pool_this->_master_info.network_address.hostname.c_str(),
worker_pool_this->_master_info.network_address.port);

PaloMetrics::report_task_requests_total.increment(1);
TMasterResult result;
AgentStatus status = worker_pool_this->_master_client->report(request, &result);

if (status == PALO_SUCCESS) {
OLAP_LOG_INFO("finish report task success. return code: %d",
result.status.status_code);
} else {
PaloMetrics::report_task_requests_failed.increment(1);
OLAP_LOG_WARNING("finish report task failed. status: %d", status);
}

Expand Down Expand Up @@ -1557,13 +1566,15 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this)
}
request.__set_disks(disks);

PaloMetrics::report_disk_requests_total.increment(1);
TMasterResult result;
AgentStatus status = worker_pool_this->_master_client->report(request, &result);

if (status == PALO_SUCCESS) {
OLAP_LOG_INFO("finish report disk state success. return code: %d",
result.status.status_code);
} else {
PaloMetrics::report_disk_requests_failed.increment(1);
OLAP_LOG_WARNING("finish report disk state failed. status: %d", status);
}

Expand Down Expand Up @@ -1631,6 +1642,7 @@ void* TaskWorkerPool::_report_olap_table_worker_thread_callback(void* arg_this)
OLAP_LOG_INFO("finish report olap table success. return code: %d",
result.status.status_code);
} else {
PaloMetrics::report_all_tablets_requests_failed.increment(1);
OLAP_LOG_WARNING("finish report olap table failed. status: %d", status);
}

Expand Down
13 changes: 3 additions & 10 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ namespace config {
CONF_Int32(cluster_id, "-1");
// port on which ImpalaInternalService is exported
CONF_Int32(be_port, "9060");
CONF_Int32(be_rpc_port, "10060");

// port for brpc
CONF_Int32(brpc_port, "8060");
Expand Down Expand Up @@ -197,8 +196,8 @@ namespace config {
CONF_Bool(dump_ir, "false");
// if set, saves the generated IR to the output file.
CONF_String(module_output, "");
// memory_limiation_per_thread_for_schema_change unit GB
CONF_Int32(memory_limiation_per_thread_for_schema_change, "2");
// memory_limitation_per_thread_for_schema_change unit GB
CONF_Int32(memory_limitation_per_thread_for_schema_change, "2");

CONF_Int64(max_unpacked_row_block_size, "104857600");

Expand Down Expand Up @@ -247,15 +246,9 @@ namespace config {
// Interface to start debug webserver on. If blank, webserver binds to 0.0.0.0
CONF_String(webserver_interface, "");
CONF_String(webserver_doc_root, "${PALO_HOME}");
CONF_Int32(webserver_num_workers, "5");
// If true, webserver may serve static files from the webserver_doc_root
CONF_Bool(enable_webserver_doc_root, "true");
// The number of times to retry connecting to an RPC server. If zero or less,
// connections will be retried until successful
CONF_Int32(rpc_retry_times, "10");
// The interval, in ms, between retrying connections to an RPC server
CONF_Int32(rpc_retry_interval_ms, "30000");
//reactor number
CONF_Int32(rpc_reactor_threads, "10")
// Period to update rate counters and sampling counters in ms.
CONF_Int32(periodic_counter_update_period_ms, "500");

Expand Down
36 changes: 36 additions & 0 deletions be/src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ Status::ErrorDetail::ErrorDetail(const TStatus& status) :
DCHECK_NE(error_code, TStatusCode::OK);
}

Status::ErrorDetail::ErrorDetail(const PStatus& pstatus)
: error_code((TStatusCode::type)pstatus.status_code()) {
DCHECK_NE(error_code, TStatusCode::OK);
for (auto& msg : pstatus.error_msgs()) {
error_msgs.push_back(msg);
}
}

Status::Status(const std::string& error_msg) :
_error_detail(new ErrorDetail(TStatusCode::INTERNAL_ERROR, error_msg)) {
LOG(INFO) << error_msg << std::endl << get_stack_trace();
Expand Down Expand Up @@ -76,6 +84,21 @@ Status& Status::operator=(const TStatus& status) {
return *this;
}

Status::Status(const PStatus& pstatus) :
_error_detail((TStatusCode::type)pstatus.status_code() == TStatusCode::OK
? nullptr : new ErrorDetail(pstatus)) {
}

Status& Status::operator=(const PStatus& status) {
delete _error_detail;
if (status.status_code() == (TStatusCode::type)TStatusCode::OK) {
_error_detail = nullptr;
} else {
_error_detail = new ErrorDetail(status);
}
return *this;
}

void Status::add_error_msg(TStatusCode::type code, const std::string& msg) {
if (_error_detail == NULL) {
_error_detail = new ErrorDetail(code, msg);
Expand Down Expand Up @@ -136,6 +159,19 @@ void Status::to_thrift(TStatus* status) const {
}
}

void Status::to_protobuf(PStatus* pstatus) const {
pstatus->clear_error_msgs();
if (_error_detail == nullptr) {
pstatus->set_status_code((int)TStatusCode::OK);
} else {
pstatus->set_status_code(_error_detail->error_code);
pstatus->mutable_error_msgs()->Reserve(_error_detail->error_msgs.size());
for (auto& err_msg : _error_detail->error_msgs) {
pstatus->add_error_msgs(err_msg);
}
}
}

void Status::MergeStatus(const Status& status) {
if (status.ok()) return;
if (_error_detail == NULL) {
Expand Down
14 changes: 14 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "common/logging.h"
#include "common/compiler_util.h"
#include "gen_cpp/Status_types.h" // for TStatus
#include "gen_cpp/status.pb.h" // for PStatus

namespace palo {

Expand Down Expand Up @@ -107,6 +108,9 @@ class Status {
// same as previous c'tor
Status& operator=(const TStatus& status);

Status(const PStatus& pstatus);
Status& operator=(const PStatus& pstatus);

// assign from stringstream
Status& operator=(const std::stringstream& stream);

Expand Down Expand Up @@ -154,6 +158,7 @@ class Status {

// Convert into TStatus.
void to_thrift(TStatus* status) const;
void to_protobuf(PStatus* status) const;

// Return all accumulated error msgs in a single string.
void get_error_msg(std::string* msg) const;
Expand All @@ -175,6 +180,7 @@ class Status {
std::vector<std::string> error_msgs;

ErrorDetail(const TStatus& status);
ErrorDetail(const PStatus& status);
ErrorDetail(TStatusCode::type code)
: error_code(code) {}
ErrorDetail(TStatusCode::type code, const std::string& msg)
Expand All @@ -193,6 +199,14 @@ class Status {
} \
} while (false)

#define RETURN_IF_STATUS_ERROR(status, stmt) \
do { \
status = (stmt); \
if (UNLIKELY(!status.ok())) { \
return; \
} \
} while (false)

#define EXIT_IF_ERROR(stmt) \
do { \
Status _status_ = (stmt); \
Expand Down
8 changes: 3 additions & 5 deletions be/src/exec/broker_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ Status BrokerReader::open() {
TBrokerOpenReaderResponse response;
try {
Status status;
// 500ms is enough
BrokerServiceConnection client(client_cache(_state), broker_addr, 500, &status);
BrokerServiceConnection client(client_cache(_state), broker_addr, 10000, &status);
if (!status.ok()) {
LOG(WARNING) << "Create broker client failed. broker=" << broker_addr
<< ", status=" << status.get_error_msg();
Expand Down Expand Up @@ -133,8 +132,7 @@ Status BrokerReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
TBrokerReadResponse response;
try {
Status status;
// 500ms is enough
BrokerServiceConnection client(client_cache(_state), broker_addr, 500, &status);
BrokerServiceConnection client(client_cache(_state), broker_addr, 10000, &status);
if (!status.ok()) {
LOG(WARNING) << "Create broker client failed. broker=" << broker_addr
<< ", status=" << status.get_error_msg();
Expand Down Expand Up @@ -189,7 +187,7 @@ void BrokerReader::close() {
try {
Status status;
// 500ms is enough
BrokerServiceConnection client(client_cache(_state), broker_addr, 500, &status);
BrokerServiceConnection client(client_cache(_state), broker_addr, 10000, &status);
if (!status.ok()) {
LOG(WARNING) << "Create broker client failed. broker=" << broker_addr
<< ", status=" << status.get_error_msg();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/broker_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ Status BrokerWriter::write(const uint8_t* buf, size_t buf_len, size_t* written_l
TBrokerOperationStatus response;
try {
Status status;
// we make timeout to be 5s, to avoid error in Network jitter scenarios.
BrokerServiceConnection client(client_cache(_state), broker_addr, 5000, &status);
// we make timeout to be 10s, to avoid error in Network jitter scenarios.
BrokerServiceConnection client(client_cache(_state), broker_addr, 10000, &status);
if (!status.ok()) {
LOG(WARNING) << "Create broker write client failed. "
<< "broker=" << broker_addr
Expand Down
13 changes: 9 additions & 4 deletions be/src/exec/new_partitioned_aggregation_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,14 @@ Status NewPartitionedAggregationNode::prepare(RuntimeState* state) {
const RowDescriptor& row_desc = child(0)->row_desc();
RETURN_IF_ERROR(NewAggFnEvaluator::Create(agg_fns_, state, _pool, agg_fn_pool_.get(),
&agg_fn_evals_, expr_mem_tracker(), row_desc));


expr_results_pool_.reset(new MemPool(_expr_mem_tracker.get()));
if (!grouping_exprs_.empty()) {
RowDescriptor build_row_desc(intermediate_tuple_desc_, false);
RETURN_IF_ERROR(NewPartitionedHashTableCtx::Create(_pool, state, build_exprs_,
grouping_exprs_, true, vector<bool>(build_exprs_.size(), true),
state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(),
expr_mem_tracker(), build_row_desc, row_desc, &ht_ctx_));
expr_results_pool_.get(), expr_mem_tracker(), build_row_desc, row_desc, &ht_ctx_));
}
// AddCodegenDisabledMessage(state);
return Status::OK;
Expand Down Expand Up @@ -413,6 +414,8 @@ Status NewPartitionedAggregationNode::GetNextInternal(RuntimeState* state,
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state());
// clear tmp expr result alocations
expr_results_pool_->clear();

if (reached_limit()) {
*eos = true;
Expand Down Expand Up @@ -688,12 +691,14 @@ Status NewPartitionedAggregationNode::close(RuntimeState* state) {
}

ClosePartitions();

child_batch_.reset();

// Close all the agg-fn-evaluators
NewAggFnEvaluator::Close(agg_fn_evals_, state);


if (expr_results_pool_.get() != nullptr) {
expr_results_pool_->free_all();
}
if (agg_fn_pool_.get() != nullptr) agg_fn_pool_->free_all();
if (mem_pool_.get() != nullptr) mem_pool_->free_all();
if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/new_partitioned_aggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ class NewPartitionedAggregationNode : public ExecNode {
/// memory allocation over a series of Reset()/Open()/GetNext()* calls.
boost::scoped_ptr<MemPool> mem_pool_;

// MemPool for allocations made by copying expr results
boost::scoped_ptr<MemPool> expr_results_pool_;

/// The current partition and iterator to the next row in its hash table that we need
/// to return in GetNext()
Partition* output_partition_;
Expand Down
Loading