Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ set_target_properties(brpc PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/
add_library(rocksdb STATIC IMPORTED)
set_target_properties(rocksdb PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/librocksdb.a)

add_library(librdkafka STATIC IMPORTED)
set_target_properties(librdkafka PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/librdkafka.a)

add_library(librdkafka_cpp STATIC IMPORTED)
set_target_properties(librdkafka_cpp PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/librdkafka++.a)

add_library(librdkafka STATIC IMPORTED)
set_target_properties(librdkafka PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/librdkafka.a)

find_program(THRIFT_COMPILER thrift ${CMAKE_SOURCE_DIR}/bin)

# llvm-config
Expand Down Expand Up @@ -440,9 +440,10 @@ set(DORIS_LINK_LIBS

# Set thirdparty libraries
set(DORIS_DEPENDENCIES
${WL_START_GROUP}
rocksdb
librdkafka
librdkafka_cpp
librdkafka
lzo
snappy
${Boost_LIBRARIES}
Expand All @@ -456,16 +457,15 @@ set(DORIS_DEPENDENCIES
libevent
mysql
curl
${WL_START_GROUP}
${LIBZ}
${LIBBZ2}
gflags
brpc
protobuf
openssl
crypto
${WL_START_GROUP}
leveldb
${WL_END_GROUP}
)

# Add all external dependencies. They should come after the palo libs.
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const Status Status::MEM_LIMIT_EXCEEDED(
const Status Status::THRIFT_RPC_ERROR(
TStatusCode::THRIFT_RPC_ERROR, "Thrift RPC failed", true);

const Status Status::TIMEOUT(
TStatusCode::TIMEOUT, "timeout", true);

Status::ErrorDetail::ErrorDetail(const TStatus& status) :
error_code(status.status_code),
error_msgs(status.error_msgs) {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Status {
static const Status CANCELLED;
static const Status MEM_LIMIT_EXCEEDED;
static const Status THRIFT_RPC_ERROR;
static const Status TIMEOUT;

// copy c'tor makes copy of error detail so Status can be returned by value
Status(const Status& status) : _error_detail(
Expand Down
29 changes: 27 additions & 2 deletions be/src/runtime/kafka_consumer_pipe.cpp → be/src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,34 @@
// specific language governing permissions and limitations
// under the License.

#include "runtime/kafka_consumer_pipe.h"
#pragma once

#include <string>

namespace doris {

struct AuthInfo {
std::string user;
std::string passwd;
std::string cluster;
std::string user_ip;
// -1 as unset
int64_t auth_code = -1;
};

template<class T>
void set_request_auth(T* req, const AuthInfo& auth) {
if (auth.auth_code != -1) {
// if auth_code is set, no need to set other info
req->auth_code = auth.auth_code;
} else {
req->user = auth.user;
req->passwd = auth.passwd;
if (!auth.cluster.empty()) {
req->__set_cluster(auth.cluster);
}
req->__set_user_ip(auth.user_ip);
}
}

} // end namespace doris
}
2 changes: 1 addition & 1 deletion be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ set(EXEC_FILES
schema_scanner/schema_columns_scanner.cpp
schema_scanner/schema_charsets_scanner.cpp
schema_scanner/schema_collations_scanner.cpp
schema_scanner/frontend_helper.cpp
schema_scanner/schema_helper.cpp
partitioned_hash_table.cc
partitioned_hash_table_ir.cc
partitioned_aggregation_node.cc
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/raw_value.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/stream_load_pipe.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/tuple.h"
#include "exprs/expr.h"
#include "exec/text_converter.h"
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/plain_text_line_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e

// (cmy), for now, return failed to avoid potential endless loop
std::stringstream ss;
ss << "decompress made no progess."
ss << "decompress made no progress."
<< " input_read_bytes: " << input_read_bytes
<< " decompressed_len: " << decompressed_len;
LOG(WARNING) << ss.str();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <boost/foreach.hpp>

#include "exec/text_converter.hpp"
#include "exec/schema_scanner/frontend_helper.h"
#include "exec/schema_scanner/schema_helper.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
#include "runtime/row_batch.h"
Expand Down
10 changes: 5 additions & 5 deletions be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "runtime/primitive_type.h"
#include "runtime/string_value.h"
#include "runtime/datetime_value.h"
#include "exec/schema_scanner/frontend_helper.h"
#include "exec/schema_scanner/schema_helper.h"

namespace doris {

Expand Down Expand Up @@ -74,7 +74,7 @@ Status SchemaColumnsScanner::start(RuntimeState *state) {
}

if (NULL != _param->ip && 0 != _param->port) {
RETURN_IF_ERROR(FrontendHelper::get_db_names(*(_param->ip),
RETURN_IF_ERROR(SchemaHelper::get_db_names(*(_param->ip),
_param->port, db_params, &_db_result));
} else {
return Status("IP or port dosn't exists");
Expand Down Expand Up @@ -151,7 +151,7 @@ Status SchemaColumnsScanner::fill_one_row(Tuple *tuple, MemPool *pool) {
{
void *slot = tuple->get_slot(_tuple_desc->slots()[1]->tuple_offset());
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
std::string db_name = FrontendHelper::extract_db_name(_db_result.dbs[_db_index - 1]);
std::string db_name = SchemaHelper::extract_db_name(_db_result.dbs[_db_index - 1]);
str_slot->ptr = (char *)pool->allocate(db_name.size());
str_slot->len = db_name.size();
memcpy(str_slot->ptr, db_name.c_str(), str_slot->len);
Expand Down Expand Up @@ -327,7 +327,7 @@ Status SchemaColumnsScanner::get_new_desc() {
}

if (NULL != _param->ip && 0 != _param->port) {
RETURN_IF_ERROR(FrontendHelper::describe_table(*(_param->ip),
RETURN_IF_ERROR(SchemaHelper::describe_table(*(_param->ip),
_param->port, desc_params, &_desc_result));
} else {
return Status("IP or port dosn't exists");
Expand All @@ -351,7 +351,7 @@ Status SchemaColumnsScanner::get_new_table() {
}

if (NULL != _param->ip && 0 != _param->port) {
RETURN_IF_ERROR(FrontendHelper::get_table_names(*(_param->ip),
RETURN_IF_ERROR(SchemaHelper::get_table_names(*(_param->ip),
_param->port, table_params, &_table_result));
} else {
return Status("IP or port dosn't exists");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "exec/schema_scanner/frontend_helper.h"
#include "exec/schema_scanner/schema_helper.h"

#include <sstream>

Expand All @@ -35,120 +35,70 @@
#include "runtime/tuple_row.h"
#include "runtime/client_cache.h"
#include "util/debug_util.h"
#include "util/frontend_helper.h"
#include "util/network_util.h"
#include "util/thrift_util.h"
#include "util/runtime_profile.h"
#include "runtime/client_cache.h"

namespace doris {

ExecEnv* FrontendHelper::_s_exec_env;

using apache::thrift::protocol::TProtocol;
using apache::thrift::protocol::TBinaryProtocol;
using apache::thrift::transport::TSocket;
using apache::thrift::transport::TTransport;
using apache::thrift::transport::TBufferedTransport;

void FrontendHelper::setup(ExecEnv* exec_env) {
_s_exec_env = exec_env;
}

Status FrontendHelper::get_db_names(
Status SchemaHelper::get_db_names(
const std::string& ip,
const int32_t port,
const TGetDbsParams &request,
TGetDbsResult *result) {
return rpc(ip, port,
return FrontendHelper::rpc(ip, port,
[&request, &result] (FrontendServiceConnection& client) {
client->getDbNames(*result, request);
});
}

Status FrontendHelper::get_table_names(
Status SchemaHelper::get_table_names(
const std::string& ip,
const int32_t port,
const TGetTablesParams &request,
TGetTablesResult *result) {
return rpc(ip, port,
return FrontendHelper::rpc(ip, port,
[&request, &result] (FrontendServiceConnection& client) {
client->getTableNames(*result, request);
});
}

Status FrontendHelper::list_table_status(
Status SchemaHelper::list_table_status(
const std::string& ip,
const int32_t port,
const TGetTablesParams &request,
TListTableStatusResult *result) {
return rpc(ip, port,
return FrontendHelper::rpc(ip, port,
[&request, &result] (FrontendServiceConnection& client) {
client->listTableStatus(*result, request);
});
}

Status FrontendHelper::describe_table(
Status SchemaHelper::describe_table(
const std::string& ip,
const int32_t port,
const TDescribeTableParams &request,
TDescribeTableResult *result) {
return rpc(ip, port,
return FrontendHelper::rpc(ip, port,
[&request, &result] (FrontendServiceConnection& client) {
client->describeTable(*result, request);
});
}

Status FrontendHelper::show_varialbes(
Status SchemaHelper::show_varialbes(
const std::string& ip,
const int32_t port,
const TShowVariableRequest &request,
TShowVariableResult *result) {
return rpc(ip, port,
return FrontendHelper::rpc(ip, port,
[&request, &result] (FrontendServiceConnection& client) {
client->showVariables(*result, request);
});
}

Status FrontendHelper::rpc(
const std::string& ip,
const int32_t port,
std::function<void (FrontendServiceConnection&)> callback,
int timeout_ms) {
TNetworkAddress address = make_network_address(ip, port);
Status status;
FrontendServiceConnection client(
_s_exec_env->frontend_client_cache(), address, timeout_ms, &status);
if (!status.ok()) {
LOG(WARNING) << "Connect frontent failed, address=" << address
<< ", status=" << status.get_error_msg();
return status;
}
try {
try {
callback(client);
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "retrying call frontend service, address="
<< address << ", reason=" << e.what();
status = client.reopen(timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "client repoen failed. address=" << address
<< ", status=" << status.get_error_msg();
return status;
}
callback(client);
}
} catch (apache::thrift::TException& e) {
// just reopen to disable this connection
client.reopen(timeout_ms);
LOG(WARNING) << "call frontend service failed, address=" << address
<< ", reason=" << e.what();
return Status(TStatusCode::THRIFT_RPC_ERROR,
"failed to call frontend service", false);
}
return Status::OK;
}

std::string FrontendHelper::extract_db_name(const std::string& full_name) {
std::string SchemaHelper::extract_db_name(const std::string& full_name) {
auto found = full_name.find(':');
if (found == std::string::npos) {
return full_name;
Expand Down
Loading