diff --git a/be/src/common/config.h b/be/src/common/config.h index f1547f1180d7b0..52c542c6ceae53 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -348,6 +348,14 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. CONF_mBool(tablet_writer_ignore_eovercrowded, "false"); +// batch size of stream load record reported to FE +CONF_mInt32(stream_load_record_batch_size, "50"); +// expire time of stream load record in rocksdb. +CONF_Int32(stream_load_record_expire_time_secs, "28800"); +// time interval to clean expired stream load records +CONF_mInt64(clean_stream_load_record_interval_secs, "1800"); +// the storage path of stream load record rocksdb +CONF_String(stream_load_record_path, "${DORIS_HOME}"); // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. // You may need to lower the speed when the sink receiver bes are too busy. diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 164676c6d62031..bc73dc9d66c59c 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -39,6 +39,7 @@ #include "http/http_request.h" #include "http/http_response.h" #include "http/utils.h" +#include "olap/storage_engine.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -48,6 +49,7 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/stream_load/stream_load_executor.h" #include "runtime/stream_load/stream_load_pipe.h" +#include "runtime/stream_load/stream_load_recorder.h" #include "util/byte_buffer.h" #include "util/debug_util.h" #include "util/doris_metrics.h" @@ -143,7 +145,7 @@ void StreamLoadAction::handle(HttpRequest* req) { << ", errmsg=" << ctx->status.get_error_msg(); } } - ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos; + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) { if (ctx->need_rollback) { @@ -160,9 +162,12 @@ void StreamLoadAction::handle(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); + str = ctx->prepare_stream_load_record(str); + _sava_stream_load_record(ctx, str); + // update statstics streaming_load_requests_total->increment(1); - streaming_load_duration_ms->increment(ctx->load_cost_nanos / 1000000); + streaming_load_duration_ms->increment(ctx->load_cost_millis); streaming_load_bytes->increment(ctx->receive_bytes); streaming_load_current_processing->increment(-1); } @@ -229,6 +234,10 @@ int StreamLoadAction::on_header(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); streaming_load_current_processing->increment(-1); + + str = ctx->prepare_stream_load_record(str); + _sava_stream_load_record(ctx, str); + return -1; } return 0; @@ -533,4 +542,17 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa return Status::OK(); } +void StreamLoadAction::_sava_stream_load_record(StreamLoadContext* ctx, const std::string& str) { + auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); + if (stream_load_recorder != nullptr) { + std::string key = std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label; + auto st = stream_load_recorder->put(key, str); + if (st.ok()) { + LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label << ", key: " << key; + } + } else { + LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null."; + } +} + } // namespace doris diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index 9f7bc2fddc2e11..d2b936716039d5 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -50,6 +50,7 @@ class StreamLoadAction : public HttpHandler { Status _data_saved_path(HttpRequest* req, std::string* file_path); Status _execute_plan_fragment(StreamLoadContext* ctx); Status _process_put(HttpRequest* http_req, StreamLoadContext* ctx); + void _sava_stream_load_record(StreamLoadContext* ctx, const std::string& str); private: ExecEnv* _exec_env; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 670cd2103243bd..58930c8702bf92 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -120,7 +120,8 @@ StorageEngine::StorageEngine(const EngineOptions& options) _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), _memtable_flush_executor(nullptr), _default_rowset_type(ALPHA_ROWSET), - _heartbeat_flags(nullptr) { + _heartbeat_flags(nullptr), + _stream_load_recorder(nullptr) { if (_s_instance == nullptr) { _s_instance = this; } @@ -224,6 +225,29 @@ Status StorageEngine::_init_store_map() { for (auto store : tmp_stores) { _store_map.emplace(store->path(), store); } + + RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_recorder(), "init StreamLoadRecorder failed"); + + return Status::OK(); +} + +Status StorageEngine::_init_stream_load_recorder() { + std::string stream_load_record_path = config::stream_load_record_path; + LOG(INFO) << "stream load record path: " << stream_load_record_path; + + // init stream load record rocksdb + _stream_load_recorder.reset(new StreamLoadRecorder(stream_load_record_path)); + if (_stream_load_recorder == nullptr) { + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"), + "new StreamLoadRecorder failed"); + } + auto st = _stream_load_recorder->init(); + if (!st.ok()) { + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::IOError(Substitute("open StreamLoadRecorder rocksdb failed, path=$0", stream_load_record_path)), + "init StreamLoadRecorder failed"); + } return Status::OK(); } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 04964c96023ee4..78b8916093616c 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -50,6 +50,7 @@ #include "olap/task/engine_task.h" #include "olap/txn_manager.h" #include "runtime/heartbeat_flags.h" +#include "runtime/stream_load/stream_load_recorder.h" #include "util/countdown_latch.h" #include "util/thread.h" #include "util/threadpool.h" @@ -174,6 +175,8 @@ class StorageEngine { void create_base_compaction(TabletSharedPtr best_tablet, std::shared_ptr& base_compaction); + std::shared_ptr get_stream_load_recorder() { return _stream_load_recorder; } + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -242,6 +245,8 @@ class StorageEngine { void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet); void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet); + Status _init_stream_load_recorder(); + private: struct CompactionCandidate { CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_) @@ -346,6 +351,8 @@ class StorageEngine { std::mutex _compaction_producer_sleep_mutex; std::condition_variable _compaction_producer_sleep_cv; + std::shared_ptr _stream_load_recorder; + DISALLOW_COPY_AND_ASSIGN(StorageEngine); }; diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 7bd7f541ff4a76..47bea84a28b970 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -91,6 +91,7 @@ set(RUNTIME_FILES message_body_sink.cpp stream_load/stream_load_context.cpp stream_load/stream_load_executor.cpp + stream_load/stream_load_recorder.cpp stream_load/load_stream_mgr.cpp routine_load/data_consumer.cpp routine_load/data_consumer_group.cpp diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index a54f657c1928d8..661ef57cee93e6 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -256,7 +256,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool // wait for all consumers finished HANDLE_ERROR(ctx->future.get(), "consume failed"); - ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos; + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; // return the consumer back to pool // call this before commit txn, in case the next task can come very fast diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 240ed08a8baa2a..21cf3159666d87 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -17,6 +17,8 @@ #include "runtime/stream_load/stream_load_context.h" +#include + namespace doris { std::string StreamLoadContext::to_json() const { @@ -69,7 +71,7 @@ std::string StreamLoadContext::to_json() const { writer.Key("LoadBytes"); writer.Int64(receive_bytes); writer.Key("LoadTimeMs"); - writer.Int64(load_cost_nanos / 1000000); + writer.Int64(load_cost_millis); writer.Key("BeginTxnTimeMs"); writer.Int64(begin_txn_cost_nanos / 1000000); writer.Key("StreamLoadPutTimeMs"); @@ -89,6 +91,157 @@ std::string StreamLoadContext::to_json() const { return s.GetString(); } +std::string StreamLoadContext::prepare_stream_load_record(const std::string& stream_load_record) { + rapidjson::Document document; + if (document.Parse(stream_load_record.data()).HasParseError()) { + LOG(WARNING) << "prepare stream load record failed. failed to parse json returned to client. label=" << label; + return ""; + } + rapidjson::Document::AllocatorType& allocator = document.GetAllocator(); + + rapidjson::Value cluster_value(rapidjson::kStringType); + cluster_value.SetString(auth.cluster.c_str(), auth.cluster.size()); + if(!cluster_value.IsNull()) { + document.AddMember("cluster", cluster_value, allocator); + } + + rapidjson::Value db_value(rapidjson::kStringType); + db_value.SetString(db.c_str(), db.size()); + if(!db_value.IsNull()) { + document.AddMember("Db", db_value, allocator); + } + + rapidjson::Value table_value(rapidjson::kStringType); + table_value.SetString(table.c_str(), table.size()); + if(!table_value.IsNull()) { + document.AddMember("Table", table_value, allocator); + } + + rapidjson::Value user_value(rapidjson::kStringType); + user_value.SetString(auth.user.c_str(), auth.user.size()); + if(!user_value.IsNull()) { + document.AddMember("User", user_value, allocator); + } + + rapidjson::Value client_ip_value(rapidjson::kStringType); + client_ip_value.SetString(auth.user_ip.c_str(), auth.user_ip.size()); + if(!client_ip_value.IsNull()) { + document.AddMember("ClientIp", client_ip_value, allocator); + } + + document.AddMember("StartTime", start_millis, allocator); + document.AddMember("FinishTime", start_millis + load_cost_millis, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + document.Accept(writer); + return buffer.GetString(); +} + +void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item) { + + rapidjson::Document document; + std::stringstream ss; + if (document.Parse(stream_load_record.data()).HasParseError()) { + LOG(WARNING) << "failed to parse json from rocksdb."; + return; + } + + if (document.HasMember("Label")) { + const rapidjson::Value& label = document["Label"]; + stream_load_item.__set_label(label.GetString()); + ss << "Label: " << label.GetString(); + } + + if (document.HasMember("Db")) { + const rapidjson::Value& db = document["Db"]; + stream_load_item.__set_db(db.GetString()); + ss << ", Db: " << db.GetString(); + } + + if (document.HasMember("Table")) { + const rapidjson::Value& table = document["Table"]; + stream_load_item.__set_tbl(table.GetString()); + ss << ", Table: " << table.GetString(); + } + + if (document.HasMember("User")) { + const rapidjson::Value& user = document["User"]; + stream_load_item.__set_user(user.GetString()); + ss << ", User: " << user.GetString(); + } + + if (document.HasMember("ClientIp")) { + const rapidjson::Value& client_ip = document["ClientIp"]; + stream_load_item.__set_user_ip(client_ip.GetString()); + ss << ", ClientIp: " << client_ip.GetString(); + } + + if (document.HasMember("Status")) { + const rapidjson::Value& status = document["Status"]; + stream_load_item.__set_status(status.GetString()); + ss << ", Status: " << status.GetString(); + } + + if (document.HasMember("Message")) { + const rapidjson::Value& message = document["Message"]; + stream_load_item.__set_message(message.GetString()); + ss << ", Message: " << message.GetString(); + } + + if (document.HasMember("ErrorURL")) { + const rapidjson::Value& error_url = document["ErrorURL"]; + stream_load_item.__set_url(error_url.GetString()); + ss << ", ErrorURL: " << error_url.GetString(); + } else { + stream_load_item.__set_url("N/A"); + ss << ", ErrorURL: N/A"; + } + + if (document.HasMember("NumberTotalRows")) { + const rapidjson::Value& total_rows = document["NumberTotalRows"]; + stream_load_item.__set_total_rows(total_rows.GetInt()); + ss << ", NumberTotalRows: " << total_rows.GetInt(); + } + + if (document.HasMember("NumberLoadedRows")) { + const rapidjson::Value& loaded_rows = document["NumberLoadedRows"]; + stream_load_item.__set_loaded_rows(loaded_rows.GetInt()); + ss << ", NumberLoadedRows: " << loaded_rows.GetInt(); + } + + if (document.HasMember("NumberFilteredRows")) { + const rapidjson::Value& filtered_rows = document["NumberFilteredRows"]; + stream_load_item.__set_filtered_rows(filtered_rows.GetInt()); + ss << ", NumberFilteredRows: " << filtered_rows.GetInt64(); + } + + if (document.HasMember("NumberUnselectedRows")) { + const rapidjson::Value& unselected_rows = document["NumberUnselectedRows"]; + stream_load_item.__set_unselected_rows(unselected_rows.GetInt()); + ss << ", NumberUnselectedRows: " << unselected_rows.GetInt64(); + } + + if (document.HasMember("LoadBytes")) { + const rapidjson::Value& load_bytes = document["LoadBytes"]; + stream_load_item.__set_load_bytes(load_bytes.GetInt()); + ss << ", LoadBytes: " << load_bytes.GetInt64(); + } + + if (document.HasMember("StartTime")) { + const rapidjson::Value& start_time = document["StartTime"]; + stream_load_item.__set_start_time(start_time.GetInt64()); + ss << ", StartTime: " << start_time.GetInt64(); + } + + if (document.HasMember("FinishTime")) { + const rapidjson::Value& finish_time = document["FinishTime"]; + stream_load_item.__set_finish_time(finish_time.GetInt64()); + ss << ", FinishTime: " << finish_time.GetInt64(); + } + + VLOG(1) << "parse json from rocksdb. " << ss.str(); +} + /* * The old mini load result format is as follows: * (which defined in src/util/json_util.cpp) diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index c0d61ab8a4ce54..aeca593605c762 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -83,7 +83,7 @@ class MessageBodySink; class StreamLoadContext { public: StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env), _refs(0) { - start_nanos = MonotonicNanos(); + start_millis = UnixMillis(); } ~StreamLoadContext() { @@ -96,6 +96,10 @@ class StreamLoadContext { } std::string to_json() const; + + std::string prepare_stream_load_record(const std::string& stream_load_record); + static void parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item); + // the old mini load result format is not same as stream load. // add this function for compatible with old mini load result format. std::string to_json_for_mini_load() const; @@ -169,9 +173,9 @@ class StreamLoadContext { int64_t number_filtered_rows = 0; int64_t number_unselected_rows = 0; int64_t loaded_bytes = 0; - int64_t start_nanos = 0; + int64_t start_millis = 0; int64_t start_write_data_nanos = 0; - int64_t load_cost_nanos = 0; + int64_t load_cost_millis = 0; int64_t begin_txn_cost_nanos = 0; int64_t stream_load_put_cost_nanos = 0; int64_t commit_and_publish_txn_cost_nanos = 0; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 9d20e09c4a88aa..3c9a237b7ebc70 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -275,7 +275,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt rl_attach.__set_unselectedRows(ctx->number_unselected_rows); rl_attach.__set_receivedBytes(ctx->receive_bytes); rl_attach.__set_loadedBytes(ctx->loaded_bytes); - rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000); + rl_attach.__set_loadCostMs(ctx->load_cost_millis); attach->rlTaskTxnCommitAttachment = std::move(rl_attach); attach->__isset.rlTaskTxnCommitAttachment = true; diff --git a/be/src/runtime/stream_load/stream_load_recorder.cpp b/be/src/runtime/stream_load/stream_load_recorder.cpp new file mode 100644 index 00000000000000..7029bc50af9234 --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_recorder.cpp @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/stream_load/stream_load_recorder.h" + +#include "common/config.h" +#include "common/status.h" +#include "rocksdb/db.h" + +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/utilities/db_ttl.h" +#include "util/time.h" + + +namespace doris { +const std::string STREAM_LOAD_POSTFIX = "/stream_load"; + +StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path) + : _root_path(root_path), + _db(nullptr), + _last_compaction_time(UnixMillis()) { +} + +StreamLoadRecorder::~StreamLoadRecorder() { + if (_db != nullptr) { + for (auto handle : _handles) { + _db->DestroyColumnFamilyHandle(handle); + handle = nullptr; + } + delete _db; + _db = nullptr; + } +} + +Status StreamLoadRecorder::init() { + // init db + rocksdb::DBOptions options; + options.IncreaseParallelism(); + options.create_if_missing = true; + options.create_missing_column_families = true; + std::string db_path = _root_path + STREAM_LOAD_POSTFIX; + std::vector column_families; + // default column family is required + column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions()); + std::vector ttls = {config::stream_load_record_expire_time_secs}; + rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls); + if (!s.ok() || _db == nullptr) { + LOG(WARNING) << "rocks db open failed, reason:" << s.ToString(); + return Status::InternalError("Stream load record rocksdb open failed, reason: " + s.ToString()); + } + return Status::OK(); +} + +Status StreamLoadRecorder::put(const std::string& key, const std::string& value) { + rocksdb::ColumnFamilyHandle* handle = _handles[0]; + rocksdb::WriteOptions write_options; + write_options.sync = false; + rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value)); + if (!s.ok()) { + LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString(); + return Status::InternalError("Stream load record rocksdb put failed, reason: " + s.ToString()); + } + + if ((UnixMillis() - _last_compaction_time) / 1000 > config::clean_stream_load_record_interval_secs) { + rocksdb::CompactRangeOptions options; + s = _db->CompactRange(options, _handles[0], nullptr, nullptr); + if (s.ok()) { + _last_compaction_time = UnixMillis(); + } + } + return Status::OK(); +} + +Status StreamLoadRecorder::get_batch(const std::string& start, const int batch_size, std::map* stream_load_records) { + rocksdb::ColumnFamilyHandle* handle = _handles[0]; + std::unique_ptr it(_db->NewIterator(rocksdb::ReadOptions(), handle)); + if (start == "-1") { + it->SeekToFirst(); + } else { + it->Seek(start); + if (it->Valid()) { + it->Next(); + } else { + it->SeekToFirst(); + } + } + rocksdb::Status status = it->status(); + if (!status.ok()) { + LOG(WARNING) << "rocksdb seek failed. reason:" << status.ToString(); + return Status::InternalError("Stream load record rocksdb seek failed"); + } + int num = 0; + for (; it->Valid(); it->Next()) { + std::string key = it->key().ToString(); + std::string value = it->value().ToString(); + (*stream_load_records)[key] = value; + num++; + if (num >= batch_size) { + return Status::OK(); + } + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_recorder.h b/be/src/runtime/stream_load/stream_load_recorder.h new file mode 100644 index 00000000000000..8cd5da32e01362 --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_recorder.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "olap/utils.h" +#include "rocksdb/utilities/db_ttl.h" + +#pragma once + +namespace doris { + +class Status; + +class StreamLoadRecorder { +public: + StreamLoadRecorder(const std::string& root_path); + + virtual ~StreamLoadRecorder(); + + Status init(); + + Status put(const std::string& key, const std::string& value); + + Status get_batch(const std::string& start, const int batch_size, std::map* stream_load_records); + +private: + std::string _root_path; + rocksdb::DBWithTTL* _db; + std::vector _handles; + + AtomicInt64 _last_compaction_time; + + enum ColumnFamilyIndex { + DEFAULT_COLUMN_FAMILY_INDEX = 0, + }; + + const std::string DEFAULT_COLUMN_FAMILY = "default"; +}; + +} // namespace doris diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 6ade79cd725687..1153c9d260d819 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include "common/config.h" @@ -45,6 +46,7 @@ #include "runtime/result_buffer_mgr.h" #include "runtime/result_queue_mgr.h" #include "runtime/routine_load/routine_load_task_executor.h" +#include "runtime/stream_load/stream_load_context.h" #include "service/backend_options.h" #include "util/arrow/row_batch.h" #include "util/blocking_queue.hpp" @@ -312,4 +314,25 @@ void BackendService::close_scanner(TScanCloseResult& result_, const TScanClosePa result_.status = t_status; } +void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const int64_t last_stream_record_time) { + auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); + if (stream_load_recorder != nullptr) { + std::map records; + auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time), config::stream_load_record_batch_size, &records); + if (st.ok()) { + LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " << records.size() << ", last_stream_load_timestamp: " << last_stream_record_time; + std::map stream_load_record_batch; + std::map::iterator it = records.begin(); + for (; it != records.end(); it++) { + TStreamLoadRecord stream_load_item; + StreamLoadContext::parse_stream_load_record(it->second, stream_load_item); + stream_load_record_batch.emplace(it->first.c_str(), stream_load_item); + } + result.__set_stream_load_record(stream_load_record_batch); + } + } else { + LOG(WARNING) << "stream_load_recorder is null."; + } +} + } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index b5b7adc9c8bec0..166a1366d82b04 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -60,6 +60,7 @@ class TSessionState; class TQueryOptions; class TExportTaskRequest; class TExportStatusResult; +class TStreamLoadRecordResult; // This class just forward rpc for actual handler // make this class because we can bind multiple service on single point @@ -140,6 +141,8 @@ class BackendService : public BackendServiceIf { // used for external service, close some context and release resource related with this context virtual void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params); + virtual void get_stream_load_record(TStreamLoadRecordResult& result, const int64_t last_stream_record_time) override; + private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); ExecEnv* _exec_env; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index cd54710f846062..17fbcf01ee0dd8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -157,6 +157,7 @@ import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadScheduler; import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; +import org.apache.doris.load.StreamLoadRecordMgr; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.meta.MetaContext; @@ -297,6 +298,7 @@ public class Catalog { private Load load; private LoadManager loadManager; + private StreamLoadRecordMgr streamLoadRecordMgr; private RoutineLoadManager routineLoadManager; private ExportMgr exportMgr; private Alter alter; @@ -546,6 +548,7 @@ private Catalog(boolean isCheckpointCatalog) { Config.async_loading_load_task_pool_size / 5, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); + this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); this.loadEtlChecker = new LoadEtlChecker(loadManager); this.loadLoadingChecker = new LoadLoadingChecker(loadManager); @@ -1294,6 +1297,7 @@ private void startMasterOnlyDaemonThreads() { dynamicPartitionScheduler.start(); // start daemon thread to update db used data quota for db txn manager periodly updateDbUsedDataQuotaDaemon.start(); + streamLoadRecordMgr.start(); } // start threads that should running on all FE @@ -4785,6 +4789,10 @@ public LoadManager getLoadManager() { return loadManager; } + public StreamLoadRecordMgr getStreamLoadRecordMgr() { + return streamLoadRecordMgr; + } + public MasterTaskExecutor getPendingLoadTaskScheduler() { return pendingLoadTaskScheduler; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java b/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java index 235d783f2e6ed2..adc010361029e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java @@ -26,6 +26,7 @@ public class AuditLog { public static final AuditLog SLOW_AUDIT = new AuditLog("audit.slow_query"); public static final AuditLog QUERY_AUDIT = new AuditLog("audit.query"); public static final AuditLog LOAD_AUDIT = new AuditLog("audit.load"); + public static final AuditLog STREAM_LOAD_AUDIT = new AuditLog("audit.stream_load"); private Logger logger; @@ -41,6 +42,10 @@ public static AuditLog getLoadAudit() { return LOAD_AUDIT; } + public static AuditLog getStreamLoadAudit() { + return STREAM_LOAD_AUDIT; + } + public AuditLog(String auditName) { logger = LogManager.getLogger(auditName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 27206c664a39cc..46c8885325309d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -103,7 +103,7 @@ public class Config extends ConfigBase { */ @ConfField public static String audit_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; @ConfField public static int audit_log_roll_num = 90; - @ConfField public static String[] audit_log_modules = {"slow_query", "query", "load"}; + @ConfField public static String[] audit_log_modules = {"slow_query", "query", "load", "stream_load"}; @ConfField(mutable = true) public static long qe_slow_log_ms = 5000; @ConfField public static String audit_log_roll_interval = "DAY"; @ConfField public static String audit_log_delete_age = "30d"; @@ -584,7 +584,13 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int desired_max_waiting_jobs = 100; - + + /** + * fetch stream load record interval. + */ + @ConfField(mutable = true, masterOnly = true) + public static int fetch_stream_load_record_interval_second = 120; + /** * maximum concurrent running txn num including prepare, commit txns under a single db * txn manager will reject coming txns diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index cec18127f3dbaf..89547b6ad22015 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -43,6 +43,7 @@ import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord; import org.apache.doris.master.Checkpoint; import org.apache.doris.mysql.privilege.UserPropertyInfo; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; @@ -508,6 +509,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_FETCH_STREAM_LOAD_RECORD: { + data = FetchStreamLoadRecord.read(in); + isRead = true; + break; + } case OperationType.OP_CREATE_RESOURCE: { data = Resource.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java new file mode 100644 index 00000000000000..b1375667e2bcce --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.plugin.AuditEvent; +import org.apache.doris.plugin.AuditEvent.EventType; +import org.apache.doris.plugin.StreamLoadAuditEvent; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStreamLoadRecord; +import org.apache.doris.thrift.TStreamLoadRecordResult; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableMap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Map; + +public class StreamLoadRecordMgr extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class); + + public StreamLoadRecordMgr(String name, long intervalMs) { + super(name, intervalMs); + } + + @Override + protected void runAfterCatalogReady() { + ImmutableMap backends = Catalog.getCurrentSystemInfo().getIdToBackend(); + long start = System.currentTimeMillis(); + int pullRecordSize = 0; + Map beIdToLastStreamLoad = Maps.newHashMap(); + for (Backend backend : backends.values()) { + BackendService.Client client = null; + TNetworkAddress address = null; + boolean ok = false; + try { + address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime()); + Map streamLoadRecordBatch = result.getStreamLoadRecord(); + pullRecordSize += streamLoadRecordBatch.size(); + long lastStreamLoadTime = -1; + for (Map.Entry entry : streamLoadRecordBatch.entrySet()) { + TStreamLoadRecord streamLoadItem= entry.getValue(); + String startTime = TimeUtils.longToTimeString(streamLoadItem.getStartTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); + String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); + LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + + " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," + + " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.", + backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), + streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), + streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), startTime, finishTime); + + AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH) + .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl()) + .setUser(streamLoadItem.getUser()).setClientIp(streamLoadItem.getUserIp()).setStatus(streamLoadItem.getStatus()) + .setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl()).setTotalRows(streamLoadItem.getTotalRows()) + .setLoadedRows(streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows()) + .setUnselectedRows(streamLoadItem.getUnselectedRows()).setLoadBytes(streamLoadItem.getLoadBytes()) + .setStartTime(startTime).setFinishTime(finishTime).build(); + Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent); + if (entry.getValue().getFinishTime() > lastStreamLoadTime) { + lastStreamLoadTime = entry.getValue().getFinishTime(); + } + } + if (streamLoadRecordBatch.size() > 0) { + backend.setLastStreamLoadTime(lastStreamLoadTime); + beIdToLastStreamLoad.put(backend.getId(), lastStreamLoadTime); + } else { + beIdToLastStreamLoad.put(backend.getId(), backend.getLastStreamLoadTime()); + } + + ok = true; + } catch (Exception e) { + LOG.warn("task exec error. backend[{}]", backend.getId(), e); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } + } + LOG.info("finished to pull stream load records of all backends. record size: {}, cost: {} ms", + pullRecordSize, (System.currentTimeMillis() - start)); + if (pullRecordSize > 0) { + FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(beIdToLastStreamLoad); + Catalog.getCurrentCatalog().getEditLog().logFetchStreamLoadRecord(fetchStreamLoadRecord); + } + } + + public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) { + ImmutableMap backends = Catalog.getCurrentSystemInfo().getIdToBackend(); + Map beIdToLastStreamLoad = fetchStreamLoadRecord.getBeIdToLastStreamLoad(); + for (Backend backend : backends.values()) { + if (beIdToLastStreamLoad.containsKey(backend.getId())) { + long lastStreamLoadTime = beIdToLastStreamLoad.get(backend.getId()); + LOG.info("Replay stream load bdbje. backend: {}, last stream load version: {}", backend.getHost(), lastStreamLoadTime); + backend.setLastStreamLoadTime(lastStreamLoadTime); + } + } + } + + public static class FetchStreamLoadRecord implements Writable { + private Map beIdToLastStreamLoad; + + public FetchStreamLoadRecord(Map beIdToLastStreamLoad) { + this.beIdToLastStreamLoad = beIdToLastStreamLoad; + } + + public void setBeIdToLastStreamLoad(Map beIdToLastStreamLoad) { + this.beIdToLastStreamLoad = beIdToLastStreamLoad; + } + + public Map getBeIdToLastStreamLoad() { + return beIdToLastStreamLoad; + } + + @Override + public void write(DataOutput out) throws IOException { + for (Map.Entry entry : beIdToLastStreamLoad.entrySet()) { + out.writeBoolean(true); + out.writeLong(entry.getKey()); + out.writeBoolean(true); + out.writeLong(entry.getValue()); + LOG.debug("Write stream load bdbje. key: {}, value: {} ", entry.getKey(), entry.getValue()); + } + } + + public static FetchStreamLoadRecord read(DataInput in) throws IOException { + Map idToLastStreamLoad = Maps.newHashMap(); + int beNum = Catalog.getCurrentSystemInfo().getIdToBackend().size(); + for (int i = 0; i < beNum; i++) { + long beId = -1; + long lastStreamLoad = -1; + if (in.readBoolean()) { + beId = in.readLong(); + } + if (in.readBoolean()) { + lastStreamLoad = in.readLong(); + } + if (beId != -1 && lastStreamLoad != -1) { + idToLastStreamLoad.put(beId, lastStreamLoad); + } + LOG.debug("Read stream load bdbje. key: {}, value: {} ", beId, lastStreamLoad); + } + FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(idToLastStreamLoad); + return fetchStreamLoadRecord; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 5f7e19418f7b6f..42a240fd8bf98a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -55,6 +55,7 @@ import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord; import org.apache.doris.meta.MetaContext; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.UserPropertyInfo; @@ -691,6 +692,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { catalog.getLoadManager().replayUpdateLoadJobStateInfo(info); break; } + case OperationType.OP_FETCH_STREAM_LOAD_RECORD: { + FetchStreamLoadRecord fetchStreamLoadRecord = (FetchStreamLoadRecord) journal.getData(); + catalog.getStreamLoadRecordMgr().replayFetchStreamLoadRecord(fetchStreamLoadRecord); + break; + } case OperationType.OP_CREATE_RESOURCE: { final Resource resource = (Resource) journal.getData(); catalog.getResourceMgr().replayCreateResource(resource); @@ -1292,6 +1298,10 @@ public void logUpdateLoadJob(LoadJobStateUpdateInfo info) { logEdit(OperationType.OP_UPDATE_LOAD_JOB, info); } + public void logFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) { + logEdit(OperationType.OP_FETCH_STREAM_LOAD_RECORD, fetchStreamLoadRecord); + } + public void logCreateResource(Resource resource) { logEdit(OperationType.OP_CREATE_RESOURCE, resource); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 08baea3dfe730a..fa18b981a5b7a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -164,6 +164,8 @@ public class OperationType { public static final short OP_END_LOAD_JOB = 231; // update job info, used by spark load public static final short OP_UPDATE_LOAD_JOB = 232; + // fetch stream load record + public static final short OP_FETCH_STREAM_LOAD_RECORD = 233; // small files 251~260 public static final short OP_CREATE_SMALL_FILE = 251; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index c02cdd8c6c6e58..af789184615106 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -36,7 +36,8 @@ public enum EventType { DISCONNECTION, BEFORE_QUERY, AFTER_QUERY, - LOAD_SUCCEED + LOAD_SUCCEED, + STREAM_LOAD_FINISH } @Retention(RetentionPolicy.RUNTIME) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java new file mode 100644 index 00000000000000..04b15b9264c958 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.plugin; + +public class StreamLoadAuditEvent extends AuditEvent { + + @AuditField(value = "Label") + public String label = ""; + @AuditField(value = "Table") + public String table = ""; + @AuditField(value = "ClientIp") + public String clientIp = ""; + @AuditField(value = "Status") + public String status = ""; + @AuditField(value = "Message") + public String message = ""; + @AuditField(value = "Url") + public String url = ""; + @AuditField(value = "TotalRows") + public long totalRows = -1; + @AuditField(value = "LoadedRows") + public long loadedRows = -1; + @AuditField(value = "FilteredRows") + public long filteredRows = -1; + @AuditField(value = "UnselectedRows") + public long unselectedRows = -1; + @AuditField(value = "LoadBytes") + public long loadBytes = -1; + @AuditField(value = "StartTime") + public String startTime = ""; + @AuditField(value = "FinishTime") + public String finishTime = ""; + + public static class AuditEventBuilder { + + private StreamLoadAuditEvent auditEvent = new StreamLoadAuditEvent(); + + public AuditEventBuilder() { + } + + public void reset() { + auditEvent = new StreamLoadAuditEvent(); + } + + public AuditEventBuilder setEventType(EventType eventType) { + auditEvent.type = eventType; + return this; + } + + public AuditEventBuilder setLabel(String label) { + auditEvent.label = label; + return this; + } + + public AuditEventBuilder setDb(String db) { + auditEvent.db = db; + return this; + } + + public AuditEventBuilder setTable(String table) { + auditEvent.table = table; + return this; + } + + public AuditEventBuilder setUser(String user) { + auditEvent.user = user; + return this; + } + + public AuditEventBuilder setClientIp(String clientIp) { + auditEvent.clientIp = clientIp; + return this; + } + + public AuditEventBuilder setStatus(String status) { + auditEvent.status = status; + return this; + } + + public AuditEventBuilder setMessage(String message) { + auditEvent.message = message; + return this; + } + + public AuditEventBuilder setUrl(String url) { + auditEvent.url = url; + return this; + } + + public AuditEventBuilder setTotalRows(long totalRows) { + auditEvent.totalRows = totalRows; + return this; + } + + public AuditEventBuilder setLoadedRows(long loadedRows) { + auditEvent.loadedRows = loadedRows; + return this; + } + + public AuditEventBuilder setFilteredRows(long filteredRows) { + auditEvent.filteredRows = filteredRows; + return this; + } + + public AuditEventBuilder setUnselectedRows(long unselectedRows) { + auditEvent.unselectedRows = unselectedRows; + return this; + } + + public AuditEventBuilder setLoadBytes(long loadBytes) { + auditEvent.loadBytes = loadBytes; + return this; + } + + public AuditEventBuilder setStartTime(String startTime) { + auditEvent.startTime = startTime; + return this; + } + + public AuditEventBuilder setFinishTime(String finishTime) { + auditEvent.finishTime = finishTime; + return this; + } + + public AuditEvent build() { + return this.auditEvent; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java index 3fd1b0a89731c5..c940e0d2f4d99c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java @@ -51,11 +51,18 @@ public class AuditLogBuilder extends Plugin implements AuditPlugin { private Set loadAnnotationSet; + private final String[] STREAM_LOAD_ANNONATION_NAMES = {"Label", "Db", "Table", "User", "ClientIp", + "Status", "Message", "Url", "TotalRows", "LoadedRows", "FilteredRows", "UnselectedRows", + "LoadBytes", "StartTime", "FinishTime"}; + + private Set streamLoadAnnotationSet; + public AuditLogBuilder() { pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + "AuditLogBuilder", PluginType.AUDIT, "builtin audit logger", DigitalVersion.fromString("0.12.0"), DigitalVersion.fromString("1.8.31"), AuditLogBuilder.class.getName(), null, null); loadAnnotationSet = Sets.newHashSet(LOAD_ANNONATION_NAMES); + streamLoadAnnotationSet = Sets.newHashSet(STREAM_LOAD_ANNONATION_NAMES); } public PluginInfo getPluginInfo() { @@ -64,7 +71,7 @@ public PluginInfo getPluginInfo() { @Override public boolean eventFilter(EventType type) { - return type == EventType.AFTER_QUERY || type == EventType.LOAD_SUCCEED; + return type == EventType.AFTER_QUERY || type == EventType.LOAD_SUCCEED || type == EventType.STREAM_LOAD_FINISH; } @Override @@ -77,6 +84,9 @@ public void exec(AuditEvent event) { case LOAD_SUCCEED: auditLoadLog(event); break; + case STREAM_LOAD_FINISH: + auditStreamLoadLog(event); + break; default: break; } @@ -132,4 +142,22 @@ private void auditLoadLog(AuditEvent event) throws IllegalAccessException { String auditLog = sb.toString(); AuditLog.getLoadAudit().log(auditLog); } + + private void auditStreamLoadLog(AuditEvent event) throws IllegalAccessException { + Field[] fields = event.getClass().getFields(); + Map annotationToFieldValueMap = Maps.newHashMap(); + for (Field f : fields) { + AuditField af = f.getAnnotation(AuditField.class); + if (af == null || !streamLoadAnnotationSet.contains(af.value())) { + continue; + } + annotationToFieldValueMap.put(af.value(), String.valueOf(f.get(event))); + } + StringBuilder sb = new StringBuilder(); + for (String annotation : STREAM_LOAD_ANNONATION_NAMES) { + sb.append("|").append(annotation).append("=").append(annotationToFieldValueMap.get(annotation)); + } + String auditLog = sb.toString(); + AuditLog.getStreamLoadAudit().log(auditLog); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index b47506ea23b9e7..bfe1ce9ce93729 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -95,6 +95,8 @@ public enum BackendState { // additional backendStatus information for BE, display in JSON format private BackendStatus backendStatus = new BackendStatus(); + private long lastStreamLoadTime = -1; + public Backend() { this.host = ""; this.version = ""; @@ -112,6 +114,8 @@ public Backend() { this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); + + this.lastStreamLoadTime = -1; } public Backend(long id, String host, int heartbeatPort) { @@ -132,6 +136,8 @@ public Backend(long id, String host, int heartbeatPort) { this.ownerClusterName = ""; this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); + + this.lastStreamLoadTime = -1; } public long getId() { @@ -170,6 +176,12 @@ public String getHeartbeatErrMsg() { return heartbeatErrMsg; } + public long getLastStreamLoadTime() { return lastStreamLoadTime; } + + public void setLastStreamLoadTime(long lastStreamLoadTime) { + this.lastStreamLoadTime = lastStreamLoadTime; + } + // for test only public void updateOnce(int bePort, int httpPort, int beRpcPort) { if (this.bePort != bePort) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index b8bb7d93001afa..b8a6152151e35b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -45,6 +45,7 @@ import org.apache.doris.thrift.TScanOpenResult; import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStreamLoadRecordResult; import org.apache.doris.thrift.TTabletStatResult; import org.apache.doris.thrift.TTransmitDataParams; import org.apache.doris.thrift.TTransmitDataResult; @@ -218,6 +219,12 @@ public TScanBatchResult getNext(TScanNextBatchParams params) throws TException { public TScanCloseResult closeScanner(TScanCloseParams params) throws TException { return null; } + + @Override + public TStreamLoadRecordResult getStreamLoadRecord(long last_stream_record_time) throws TException { + // TODO Auto-generated method stub + return null; + } } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 06b6ecd729a209..64a833cfb890de 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -65,6 +65,7 @@ import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TStreamLoadRecordResult; import org.apache.doris.thrift.TTabletStatResult; import org.apache.doris.thrift.TTransmitDataParams; import org.apache.doris.thrift.TTransmitDataResult; @@ -281,6 +282,11 @@ public TScanBatchResult getNext(TScanNextBatchParams params) throws TException { public TScanCloseResult closeScanner(TScanCloseParams params) throws TException { return null; } + + @Override + public TStreamLoadRecordResult getStreamLoadRecord(long last_stream_record_time) throws TException { + return new TStreamLoadRecordResult(Maps.newHashMap()); + } } // The default Brpc service. diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 5b53c7f5a46d5a..200a8ead956534 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -80,6 +80,31 @@ struct TProxyResult { 2: optional TKafkaMetaProxyResult kafka_meta_result; } +struct TStreamLoadRecord { + 1: optional string cluster + 2: required string user + 3: required string passwd + 4: required string db + 5: required string tbl + 6: optional string user_ip + 7: required string label + 8: required string status + 9: required string message + 10: optional string url + 11: optional i64 auth_code; + 12: required i64 total_rows + 13: required i64 loaded_rows + 14: required i64 filtered_rows + 15: required i64 unselected_rows + 16: required i64 load_bytes + 17: required i64 start_time + 18: required i64 finish_time +} + +struct TStreamLoadRecordResult { + 1: required map stream_load_record +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -134,4 +159,6 @@ service BackendService { // release the context resource associated with the context_id DorisExternalService.TScanCloseResult close_scanner(1: DorisExternalService.TScanCloseParams params); + TStreamLoadRecordResult get_stream_load_record(1: i64 last_stream_record_time); + }