Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,14 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200");
CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60");
// You can ignore brpc error '[E1011]The server is overcrowded' when writing data.
CONF_mBool(tablet_writer_ignore_eovercrowded, "false");
// batch size of stream load record reported to FE
CONF_mInt32(stream_load_record_batch_size, "50");
// expire time of stream load record in rocksdb.
CONF_Int32(stream_load_record_expire_time_secs, "28800");
// time interval to clean expired stream load records
CONF_mInt64(clean_stream_load_record_interval_secs, "1800");
// the storage path of stream load record rocksdb
CONF_String(stream_load_record_path, "${DORIS_HOME}");

// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
Expand Down
26 changes: 24 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "http/http_request.h"
#include "http/http_response.h"
#include "http/utils.h"
#include "olap/storage_engine.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
Expand All @@ -48,6 +49,7 @@
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/byte_buffer.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
Expand Down Expand Up @@ -143,7 +145,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
<< ", errmsg=" << ctx->status.get_error_msg();
}
}
ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos;
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) {
if (ctx->need_rollback) {
Expand All @@ -160,9 +162,12 @@ void StreamLoadAction::handle(HttpRequest* req) {
str = str + '\n';
HttpChannel::send_reply(req, str);

str = ctx->prepare_stream_load_record(str);
_sava_stream_load_record(ctx, str);

// update statstics
streaming_load_requests_total->increment(1);
streaming_load_duration_ms->increment(ctx->load_cost_nanos / 1000000);
streaming_load_duration_ms->increment(ctx->load_cost_millis);
streaming_load_bytes->increment(ctx->receive_bytes);
streaming_load_current_processing->increment(-1);
}
Expand Down Expand Up @@ -229,6 +234,10 @@ int StreamLoadAction::on_header(HttpRequest* req) {
str = str + '\n';
HttpChannel::send_reply(req, str);
streaming_load_current_processing->increment(-1);

str = ctx->prepare_stream_load_record(str);
_sava_stream_load_record(ctx, str);

return -1;
}
return 0;
Expand Down Expand Up @@ -533,4 +542,17 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa
return Status::OK();
}

void StreamLoadAction::_sava_stream_load_record(StreamLoadContext* ctx, const std::string& str) {
auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder();
if (stream_load_recorder != nullptr) {
std::string key = std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
auto st = stream_load_recorder->put(key, str);
if (st.ok()) {
LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label << ", key: " << key;
}
} else {
LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null.";
}
}

} // namespace doris
1 change: 1 addition & 0 deletions be/src/http/action/stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class StreamLoadAction : public HttpHandler {
Status _data_saved_path(HttpRequest* req, std::string* file_path);
Status _execute_plan_fragment(StreamLoadContext* ctx);
Status _process_put(HttpRequest* http_req, StreamLoadContext* ctx);
void _sava_stream_load_record(StreamLoadContext* ctx, const std::string& str);

private:
ExecEnv* _exec_env;
Expand Down
26 changes: 25 additions & 1 deletion be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)),
_memtable_flush_executor(nullptr),
_default_rowset_type(ALPHA_ROWSET),
_heartbeat_flags(nullptr) {
_heartbeat_flags(nullptr),
_stream_load_recorder(nullptr) {
if (_s_instance == nullptr) {
_s_instance = this;
}
Expand Down Expand Up @@ -224,6 +225,29 @@ Status StorageEngine::_init_store_map() {
for (auto store : tmp_stores) {
_store_map.emplace(store->path(), store);
}

RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_recorder(), "init StreamLoadRecorder failed");

return Status::OK();
}

Status StorageEngine::_init_stream_load_recorder() {
std::string stream_load_record_path = config::stream_load_record_path;
LOG(INFO) << "stream load record path: " << stream_load_record_path;

// init stream load record rocksdb
_stream_load_recorder.reset(new StreamLoadRecorder(stream_load_record_path));
if (_stream_load_recorder == nullptr) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"),
"new StreamLoadRecorder failed");
}
auto st = _stream_load_recorder->init();
if (!st.ok()) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::IOError(Substitute("open StreamLoadRecorder rocksdb failed, path=$0", stream_load_record_path)),
"init StreamLoadRecorder failed");
}
return Status::OK();
}

Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "olap/task/engine_task.h"
#include "olap/txn_manager.h"
#include "runtime/heartbeat_flags.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/countdown_latch.h"
#include "util/thread.h"
#include "util/threadpool.h"
Expand Down Expand Up @@ -174,6 +175,8 @@ class StorageEngine {
void create_base_compaction(TabletSharedPtr best_tablet,
std::shared_ptr<BaseCompaction>& base_compaction);

std::shared_ptr<StreamLoadRecorder> get_stream_load_recorder() { return _stream_load_recorder; }

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down Expand Up @@ -242,6 +245,8 @@ class StorageEngine {
void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet);
void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet);

Status _init_stream_load_recorder();

private:
struct CompactionCandidate {
CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
Expand Down Expand Up @@ -346,6 +351,8 @@ class StorageEngine {
std::mutex _compaction_producer_sleep_mutex;
std::condition_variable _compaction_producer_sleep_cv;

std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;

DISALLOW_COPY_AND_ASSIGN(StorageEngine);
};

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ set(RUNTIME_FILES
message_body_sink.cpp
stream_load/stream_load_context.cpp
stream_load/stream_load_executor.cpp
stream_load/stream_load_recorder.cpp
stream_load/load_stream_mgr.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_group.cpp
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool
// wait for all consumers finished
HANDLE_ERROR(ctx->future.get(), "consume failed");

ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos;
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

// return the consumer back to pool
// call this before commit txn, in case the next task can come very fast
Expand Down
155 changes: 154 additions & 1 deletion be/src/runtime/stream_load/stream_load_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "runtime/stream_load/stream_load_context.h"

#include <sstream>

namespace doris {

std::string StreamLoadContext::to_json() const {
Expand Down Expand Up @@ -69,7 +71,7 @@ std::string StreamLoadContext::to_json() const {
writer.Key("LoadBytes");
writer.Int64(receive_bytes);
writer.Key("LoadTimeMs");
writer.Int64(load_cost_nanos / 1000000);
writer.Int64(load_cost_millis);
writer.Key("BeginTxnTimeMs");
writer.Int64(begin_txn_cost_nanos / 1000000);
writer.Key("StreamLoadPutTimeMs");
Expand All @@ -89,6 +91,157 @@ std::string StreamLoadContext::to_json() const {
return s.GetString();
}

std::string StreamLoadContext::prepare_stream_load_record(const std::string& stream_load_record) {
rapidjson::Document document;
if (document.Parse(stream_load_record.data()).HasParseError()) {
LOG(WARNING) << "prepare stream load record failed. failed to parse json returned to client. label=" << label;
return "";
}
rapidjson::Document::AllocatorType& allocator = document.GetAllocator();

rapidjson::Value cluster_value(rapidjson::kStringType);
cluster_value.SetString(auth.cluster.c_str(), auth.cluster.size());
if(!cluster_value.IsNull()) {
document.AddMember("cluster", cluster_value, allocator);
}

rapidjson::Value db_value(rapidjson::kStringType);
db_value.SetString(db.c_str(), db.size());
if(!db_value.IsNull()) {
document.AddMember("Db", db_value, allocator);
}

rapidjson::Value table_value(rapidjson::kStringType);
table_value.SetString(table.c_str(), table.size());
if(!table_value.IsNull()) {
document.AddMember("Table", table_value, allocator);
}

rapidjson::Value user_value(rapidjson::kStringType);
user_value.SetString(auth.user.c_str(), auth.user.size());
if(!user_value.IsNull()) {
document.AddMember("User", user_value, allocator);
}

rapidjson::Value client_ip_value(rapidjson::kStringType);
client_ip_value.SetString(auth.user_ip.c_str(), auth.user_ip.size());
if(!client_ip_value.IsNull()) {
document.AddMember("ClientIp", client_ip_value, allocator);
}

document.AddMember("StartTime", start_millis, allocator);
document.AddMember("FinishTime", start_millis + load_cost_millis, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
document.Accept(writer);
return buffer.GetString();
}

void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item) {

rapidjson::Document document;
std::stringstream ss;
if (document.Parse(stream_load_record.data()).HasParseError()) {
LOG(WARNING) << "failed to parse json from rocksdb.";
return;
}

if (document.HasMember("Label")) {
const rapidjson::Value& label = document["Label"];
stream_load_item.__set_label(label.GetString());
ss << "Label: " << label.GetString();
}

if (document.HasMember("Db")) {
const rapidjson::Value& db = document["Db"];
stream_load_item.__set_db(db.GetString());
ss << ", Db: " << db.GetString();
}

if (document.HasMember("Table")) {
const rapidjson::Value& table = document["Table"];
stream_load_item.__set_tbl(table.GetString());
ss << ", Table: " << table.GetString();
}

if (document.HasMember("User")) {
const rapidjson::Value& user = document["User"];
stream_load_item.__set_user(user.GetString());
ss << ", User: " << user.GetString();
}

if (document.HasMember("ClientIp")) {
const rapidjson::Value& client_ip = document["ClientIp"];
stream_load_item.__set_user_ip(client_ip.GetString());
ss << ", ClientIp: " << client_ip.GetString();
}

if (document.HasMember("Status")) {
const rapidjson::Value& status = document["Status"];
stream_load_item.__set_status(status.GetString());
ss << ", Status: " << status.GetString();
}

if (document.HasMember("Message")) {
const rapidjson::Value& message = document["Message"];
stream_load_item.__set_message(message.GetString());
ss << ", Message: " << message.GetString();
}

if (document.HasMember("ErrorURL")) {
const rapidjson::Value& error_url = document["ErrorURL"];
stream_load_item.__set_url(error_url.GetString());
ss << ", ErrorURL: " << error_url.GetString();
} else {
stream_load_item.__set_url("N/A");
ss << ", ErrorURL: N/A";
}

if (document.HasMember("NumberTotalRows")) {
const rapidjson::Value& total_rows = document["NumberTotalRows"];
stream_load_item.__set_total_rows(total_rows.GetInt());
ss << ", NumberTotalRows: " << total_rows.GetInt();
}

if (document.HasMember("NumberLoadedRows")) {
const rapidjson::Value& loaded_rows = document["NumberLoadedRows"];
stream_load_item.__set_loaded_rows(loaded_rows.GetInt());
ss << ", NumberLoadedRows: " << loaded_rows.GetInt();
}

if (document.HasMember("NumberFilteredRows")) {
const rapidjson::Value& filtered_rows = document["NumberFilteredRows"];
stream_load_item.__set_filtered_rows(filtered_rows.GetInt());
ss << ", NumberFilteredRows: " << filtered_rows.GetInt64();
}

if (document.HasMember("NumberUnselectedRows")) {
const rapidjson::Value& unselected_rows = document["NumberUnselectedRows"];
stream_load_item.__set_unselected_rows(unselected_rows.GetInt());
ss << ", NumberUnselectedRows: " << unselected_rows.GetInt64();
}

if (document.HasMember("LoadBytes")) {
const rapidjson::Value& load_bytes = document["LoadBytes"];
stream_load_item.__set_load_bytes(load_bytes.GetInt());
ss << ", LoadBytes: " << load_bytes.GetInt64();
}

if (document.HasMember("StartTime")) {
const rapidjson::Value& start_time = document["StartTime"];
stream_load_item.__set_start_time(start_time.GetInt64());
ss << ", StartTime: " << start_time.GetInt64();
}

if (document.HasMember("FinishTime")) {
const rapidjson::Value& finish_time = document["FinishTime"];
stream_load_item.__set_finish_time(finish_time.GetInt64());
ss << ", FinishTime: " << finish_time.GetInt64();
}

VLOG(1) << "parse json from rocksdb. " << ss.str();
}

/*
* The old mini load result format is as follows:
* (which defined in src/util/json_util.cpp)
Expand Down
10 changes: 7 additions & 3 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class MessageBodySink;
class StreamLoadContext {
public:
StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env), _refs(0) {
start_nanos = MonotonicNanos();
start_millis = UnixMillis();
}

~StreamLoadContext() {
Expand All @@ -96,6 +96,10 @@ class StreamLoadContext {
}

std::string to_json() const;

std::string prepare_stream_load_record(const std::string& stream_load_record);
static void parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item);

// the old mini load result format is not same as stream load.
// add this function for compatible with old mini load result format.
std::string to_json_for_mini_load() const;
Expand Down Expand Up @@ -169,9 +173,9 @@ class StreamLoadContext {
int64_t number_filtered_rows = 0;
int64_t number_unselected_rows = 0;
int64_t loaded_bytes = 0;
int64_t start_nanos = 0;
int64_t start_millis = 0;
int64_t start_write_data_nanos = 0;
int64_t load_cost_nanos = 0;
int64_t load_cost_millis = 0;
int64_t begin_txn_cost_nanos = 0;
int64_t stream_load_put_cost_nanos = 0;
int64_t commit_and_publish_txn_cost_nanos = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt
rl_attach.__set_unselectedRows(ctx->number_unselected_rows);
rl_attach.__set_receivedBytes(ctx->receive_bytes);
rl_attach.__set_loadedBytes(ctx->loaded_bytes);
rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000);
rl_attach.__set_loadCostMs(ctx->load_cost_millis);

attach->rlTaskTxnCommitAttachment = std::move(rl_attach);
attach->__isset.rlTaskTxnCommitAttachment = true;
Expand Down
Loading