From ef96fc2b6067dbafa251cf0e1a47769dfd8a14d5 Mon Sep 17 00:00:00 2001 From: weizuo Date: Wed, 10 Mar 2021 21:38:54 +0800 Subject: [PATCH 1/6] store stream load record in rocksdb --- be/src/common/config.h | 8 + be/src/http/action/stream_load.cpp | 25 ++- be/src/http/action/stream_load.h | 1 + be/src/olap/storage_engine.cpp | 26 ++- be/src/olap/storage_engine.h | 7 + be/src/runtime/CMakeLists.txt | 1 + .../routine_load_task_executor.cpp | 2 +- .../stream_load/stream_load_context.cpp | 101 +++++++++- .../runtime/stream_load/stream_load_context.h | 9 +- .../stream_load/stream_load_executor.cpp | 2 +- .../stream_load/stream_load_record.cpp | 125 +++++++++++++ .../runtime/stream_load/stream_load_record.h | 57 ++++++ be/src/service/backend_service.cpp | 23 +++ be/src/service/backend_service.h | 3 + .../org/apache/doris/catalog/Catalog.java | 8 + .../org/apache/doris/common/AuditLog.java | 5 + .../java/org/apache/doris/common/Config.java | 10 +- .../apache/doris/journal/JournalEntity.java | 6 + .../doris/load/StreamLoadRecordMgr.java | 175 ++++++++++++++++++ .../org/apache/doris/persist/EditLog.java | 10 + .../apache/doris/persist/OperationType.java | 2 + .../org/apache/doris/plugin/AuditEvent.java | 3 +- .../doris/plugin/StreamLoadAuditEvent.java | 144 ++++++++++++++ .../org/apache/doris/qe/AuditLogBuilder.java | 30 ++- .../java/org/apache/doris/system/Backend.java | 12 ++ .../apache/doris/common/GenericPoolTest.java | 7 + .../doris/utframe/MockedBackendFactory.java | 6 + gensrc/thrift/BackendService.thrift | 27 +++ 28 files changed, 821 insertions(+), 14 deletions(-) create mode 100644 be/src/runtime/stream_load/stream_load_record.cpp create mode 100644 be/src/runtime/stream_load/stream_load_record.h create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java diff --git a/be/src/common/config.h b/be/src/common/config.h index f1547f1180d7b0..31012f9f4d8c9e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -348,6 +348,14 @@ CONF_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); CONF_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. CONF_mBool(tablet_writer_ignore_eovercrowded, "false"); +// batch size of stream load record reported to FE +CONF_mInt32(stream_load_record_batch_size, "50"); +// expire time of stream load record in rocksdb. +CONF_Int32(stream_load_record_expire_time_secs, "28800"); +// time interval to clean expired stream load records +CONF_mInt64(clean_stream_load_record_interval_secs, "1800"); +// the storage path of stream load record rocksdb +CONF_String(stream_load_record_path, ""); // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. // You may need to lower the speed when the sink receiver bes are too busy. diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 164676c6d62031..5db8f9fe1bfac1 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -39,6 +39,7 @@ #include "http/http_request.h" #include "http/http_response.h" #include "http/utils.h" +#include "olap/storage_engine.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" @@ -48,6 +49,7 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/stream_load/stream_load_executor.h" #include "runtime/stream_load/stream_load_pipe.h" +#include "runtime/stream_load/stream_load_record.h" #include "util/byte_buffer.h" #include "util/debug_util.h" #include "util/doris_metrics.h" @@ -143,7 +145,7 @@ void StreamLoadAction::handle(HttpRequest* req) { << ", errmsg=" << ctx->status.get_error_msg(); } } - ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos; + ctx->load_cost_micros = UnixMicros() - ctx->start_micros; if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) { if (ctx->need_rollback) { @@ -160,9 +162,11 @@ void StreamLoadAction::handle(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); + _sava_stream_load_record(ctx, str); + // update statstics streaming_load_requests_total->increment(1); - streaming_load_duration_ms->increment(ctx->load_cost_nanos / 1000000); + streaming_load_duration_ms->increment(ctx->load_cost_micros / 1000); streaming_load_bytes->increment(ctx->receive_bytes); streaming_load_current_processing->increment(-1); } @@ -228,7 +232,9 @@ int StreamLoadAction::on_header(HttpRequest* req) { // add new line at end str = str + '\n'; HttpChannel::send_reply(req, str); - streaming_load_current_processing->increment(-1); + + _sava_stream_load_record(ctx, str); + return -1; } return 0; @@ -533,4 +539,17 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa return Status::OK(); } +void StreamLoadAction::_sava_stream_load_record(StreamLoadContext* ctx, const std::string& str) { + auto stream_load_record = StorageEngine::instance()->get_stream_load_record(); + if (stream_load_record != nullptr) { + std::string key = ToStringFromUnixMicros(ctx->start_micros + ctx->load_cost_micros) + "_" + ctx->label; + auto st = stream_load_record->put(key, str); + if (st.ok()) { + LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label << ", key: " << key; + } + } else { + LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_record is null."; + } +} + } // namespace doris diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index 9f7bc2fddc2e11..d2b936716039d5 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -50,6 +50,7 @@ class StreamLoadAction : public HttpHandler { Status _data_saved_path(HttpRequest* req, std::string* file_path); Status _execute_plan_fragment(StreamLoadContext* ctx); Status _process_put(HttpRequest* http_req, StreamLoadContext* ctx); + void _sava_stream_load_record(StreamLoadContext* ctx, const std::string& str); private: ExecEnv* _exec_env; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 670cd2103243bd..9f5764ad64991c 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -120,7 +120,8 @@ StorageEngine::StorageEngine(const EngineOptions& options) _rowset_id_generator(new UniqueRowsetIdGenerator(options.backend_uid)), _memtable_flush_executor(nullptr), _default_rowset_type(ALPHA_ROWSET), - _heartbeat_flags(nullptr) { + _heartbeat_flags(nullptr), + _stream_load_record(nullptr) { if (_s_instance == nullptr) { _s_instance = this; } @@ -224,6 +225,29 @@ Status StorageEngine::_init_store_map() { for (auto store : tmp_stores) { _store_map.emplace(store->path(), store); } + + RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_record(), "init StreamLoadRecord failed"); + + return Status::OK(); +} + +Status StorageEngine::_init_stream_load_record() { + std::string stream_load_record_path = config::stream_load_record_path; + LOG(INFO) << "stream load record path: " << stream_load_record_path; + + // init stream load record rocksdb + _stream_load_record.reset(new StreamLoadRecord(stream_load_record_path)); + if (_stream_load_record == nullptr) { + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::MemoryAllocFailed("allocate memory for StreamLoadRecord failed"), + "new StreamLoadRecord failed"); + } + auto st = _stream_load_record->init(); + if (!st.ok()) { + RETURN_NOT_OK_STATUS_WITH_WARN( + Status::IOError(Substitute("open StreamLoadRecord rocksdb failed, path=$0", stream_load_record_path)), + "init StreamLoadRecord failed"); + } return Status::OK(); } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 04964c96023ee4..2216199c029586 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -50,6 +50,7 @@ #include "olap/task/engine_task.h" #include "olap/txn_manager.h" #include "runtime/heartbeat_flags.h" +#include "runtime/stream_load/stream_load_record.h" #include "util/countdown_latch.h" #include "util/thread.h" #include "util/threadpool.h" @@ -174,6 +175,8 @@ class StorageEngine { void create_base_compaction(TabletSharedPtr best_tablet, std::shared_ptr& base_compaction); + std::shared_ptr get_stream_load_record() { return _stream_load_record; } + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. @@ -242,6 +245,8 @@ class StorageEngine { void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet); void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet); + Status _init_stream_load_record(); + private: struct CompactionCandidate { CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_) @@ -346,6 +351,8 @@ class StorageEngine { std::mutex _compaction_producer_sleep_mutex; std::condition_variable _compaction_producer_sleep_cv; + std::shared_ptr _stream_load_record; + DISALLOW_COPY_AND_ASSIGN(StorageEngine); }; diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 7bd7f541ff4a76..8004903a6859b5 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -91,6 +91,7 @@ set(RUNTIME_FILES message_body_sink.cpp stream_load/stream_load_context.cpp stream_load/stream_load_executor.cpp + stream_load/stream_load_record.cpp stream_load/load_stream_mgr.cpp routine_load/data_consumer.cpp routine_load/data_consumer_group.cpp diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index a54f657c1928d8..353ce7aff94ff2 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -256,7 +256,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool // wait for all consumers finished HANDLE_ERROR(ctx->future.get(), "consume failed"); - ctx->load_cost_nanos = MonotonicNanos() - ctx->start_nanos; + ctx->load_cost_micros = UnixMicros() - ctx->start_micros; // return the consumer back to pool // call this before commit txn, in case the next task can come very fast diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 240ed08a8baa2a..fbdc364fa80b6a 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -31,6 +31,16 @@ std::string StreamLoadContext::to_json() const { // label writer.Key("Label"); writer.String(label.c_str()); + writer.Key("cluster"); + writer.String(auth.cluster.c_str()); + writer.Key("Db"); + writer.String(db.c_str()); + writer.Key("Table"); + writer.String(table.c_str()); + writer.Key("User"); + writer.String(auth.user.c_str()); + writer.Key("ClientIp"); + writer.String(auth.user_ip.c_str()); // status writer.Key("Status"); @@ -68,8 +78,12 @@ std::string StreamLoadContext::to_json() const { writer.Int64(number_unselected_rows); writer.Key("LoadBytes"); writer.Int64(receive_bytes); + writer.Key("StartTime"); + writer.String(ToStringFromUnixMicros(start_micros).c_str()); + writer.Key("FinishTime"); + writer.String(ToStringFromUnixMicros(start_micros + load_cost_micros).c_str()); writer.Key("LoadTimeMs"); - writer.Int64(load_cost_nanos / 1000000); + writer.Int64(load_cost_micros / 1000); writer.Key("BeginTxnTimeMs"); writer.Int64(begin_txn_cost_nanos / 1000000); writer.Key("StreamLoadPutTimeMs"); @@ -89,6 +103,91 @@ std::string StreamLoadContext::to_json() const { return s.GetString(); } +void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item) { + + rapidjson::Document document; + if (document.Parse(stream_load_record.data()).HasParseError()) { + return; + } + + if (document.HasMember("Label")) { + const rapidjson::Value& label = document["Label"]; + stream_load_item.__set_label(label.GetString()); + } + + if (document.HasMember("Db")) { + const rapidjson::Value& db = document["Db"]; + stream_load_item.__set_db(db.GetString()); + } + + if (document.HasMember("Table")) { + const rapidjson::Value& table = document["Table"]; + stream_load_item.__set_tbl(table.GetString()); + } + + if (document.HasMember("User")) { + const rapidjson::Value& user = document["User"]; + stream_load_item.__set_user(user.GetString()); + } + + if (document.HasMember("ClientIp")) { + const rapidjson::Value& client_ip = document["ClientIp"]; + stream_load_item.__set_user_ip(client_ip.GetString()); + } + + if (document.HasMember("Status")) { + const rapidjson::Value& status = document["Status"]; + stream_load_item.__set_status(status.GetString()); + } + + if (document.HasMember("Message")) { + const rapidjson::Value& message = document["Message"]; + stream_load_item.__set_message(message.GetString()); + } + + if (document.HasMember("ErrorURL")) { + const rapidjson::Value& error_url = document["ErrorURL"]; + stream_load_item.__set_url(error_url.GetString()); + } else { + stream_load_item.__set_url("N/A"); + } + + if (document.HasMember("NumberTotalRows")) { + const rapidjson::Value& total_rows = document["NumberTotalRows"]; + stream_load_item.__set_total_rows(total_rows.GetInt()); + } + + if (document.HasMember("NumberLoadedRows")) { + const rapidjson::Value& loaded_rows = document["NumberLoadedRows"]; + stream_load_item.__set_loaded_rows(loaded_rows.GetInt()); + } + + if (document.HasMember("NumberFilteredRows")) { + const rapidjson::Value& filtered_rows = document["NumberFilteredRows"]; + stream_load_item.__set_filtered_rows(filtered_rows.GetInt()); + } + + if (document.HasMember("NumberUnselectedRows")) { + const rapidjson::Value& unselected_rows = document["NumberUnselectedRows"]; + stream_load_item.__set_unselected_rows(unselected_rows.GetInt()); + } + + if (document.HasMember("LoadBytes")) { + const rapidjson::Value& load_bytes = document["LoadBytes"]; + stream_load_item.__set_load_bytes(load_bytes.GetInt()); + } + + if (document.HasMember("StartTime")) { + const rapidjson::Value& start_time = document["StartTime"]; + stream_load_item.__set_start_time(start_time.GetString()); + } + + if (document.HasMember("FinishTime")) { + const rapidjson::Value& finish_time = document["FinishTime"]; + stream_load_item.__set_finish_time(finish_time.GetString()); + } +} + /* * The old mini load result format is as follows: * (which defined in src/util/json_util.cpp) diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index c0d61ab8a4ce54..adc4e0583431d8 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -83,7 +83,7 @@ class MessageBodySink; class StreamLoadContext { public: StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env), _refs(0) { - start_nanos = MonotonicNanos(); + start_micros = UnixMicros(); } ~StreamLoadContext() { @@ -96,6 +96,9 @@ class StreamLoadContext { } std::string to_json() const; + + static void parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item); + // the old mini load result format is not same as stream load. // add this function for compatible with old mini load result format. std::string to_json_for_mini_load() const; @@ -169,9 +172,9 @@ class StreamLoadContext { int64_t number_filtered_rows = 0; int64_t number_unselected_rows = 0; int64_t loaded_bytes = 0; - int64_t start_nanos = 0; + int64_t start_micros = 0; int64_t start_write_data_nanos = 0; - int64_t load_cost_nanos = 0; + int64_t load_cost_micros = 0; int64_t begin_txn_cost_nanos = 0; int64_t stream_load_put_cost_nanos = 0; int64_t commit_and_publish_txn_cost_nanos = 0; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 9d20e09c4a88aa..6ce64be7b749b4 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -275,7 +275,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt rl_attach.__set_unselectedRows(ctx->number_unselected_rows); rl_attach.__set_receivedBytes(ctx->receive_bytes); rl_attach.__set_loadedBytes(ctx->loaded_bytes); - rl_attach.__set_loadCostMs(ctx->load_cost_nanos / 1000 / 1000); + rl_attach.__set_loadCostMs(ctx->load_cost_micros / 1000); attach->rlTaskTxnCommitAttachment = std::move(rl_attach); attach->__isset.rlTaskTxnCommitAttachment = true; diff --git a/be/src/runtime/stream_load/stream_load_record.cpp b/be/src/runtime/stream_load/stream_load_record.cpp new file mode 100644 index 00000000000000..caeb472ebbae81 --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_record.cpp @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/stream_load/stream_load_record.h" + +#include "common/config.h" +#include "common/status.h" +#include "rocksdb/db.h" + +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/utilities/db_ttl.h" +#include "util/time.h" + + +namespace doris { +const std::string STREAM_LOAD_POSTFIX = "/stream_load"; + +StreamLoadRecord::StreamLoadRecord(const std::string& root_path) + : _root_path(root_path), + _db(nullptr), + _last_compaction_time(UnixMillis()) { +} + +StreamLoadRecord::~StreamLoadRecord() { + if (_db != nullptr) { + for (auto handle : _handles) { + _db->DestroyColumnFamilyHandle(handle); + handle = nullptr; + } + delete _db; + _db= nullptr; + } +} + +Status StreamLoadRecord::init() { + // init db + rocksdb::DBOptions options; + options.IncreaseParallelism(); + options.create_if_missing = true; + options.create_missing_column_families = true; + std::string db_path = _root_path + STREAM_LOAD_POSTFIX; + std::vector column_families; + // default column family is required + column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions()); + // stream load column family + column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions()); + std::vector ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs}; + rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls); + + if (!s.ok() || _db == nullptr) { + LOG(WARNING) << "rocks db open failed, reason:" << s.ToString(); + return Status::InternalError("Stream load record rocksdb open failed"); + } + return Status::OK(); +} + +Status StreamLoadRecord::put(const std::string& key, const std::string& value) { + rocksdb::ColumnFamilyHandle* handle = _handles[1]; + rocksdb::WriteOptions write_options; + write_options.sync = false; + rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value)); + if (!s.ok()) { + LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString(); + return Status::InternalError("Stream load record rocksdb put failed"); + } + + if ((UnixMillis() - _last_compaction_time) / 1000 > config::clean_stream_load_record_interval_secs) { + rocksdb::CompactRangeOptions options; + s = _db->CompactRange(options, _handles[1], nullptr, nullptr); + if (s.ok()) { + _last_compaction_time = UnixMillis(); + } + } + return Status::OK(); +} + +Status StreamLoadRecord::get_batch(const std::string& start, const int batch_size, std::map* stream_load_records) { + rocksdb::ColumnFamilyHandle* handle = _handles[1]; + std::unique_ptr it(_db->NewIterator(rocksdb::ReadOptions(), handle)); + if (start == "") { + it->SeekToFirst(); + } else { + it->Seek(start); + rocksdb::Status status = it->status(); + if (status.ok()) { + it->Next(); + } else { + it->SeekToFirst(); + } + } + rocksdb::Status status = it->status(); + if (!status.ok()) { + LOG(WARNING) << "rocksdb seek failed. reason:" << status.ToString(); + return Status::InternalError("Stream load record rocksdb seek failed"); + } + int num = 0; + for (; it->Valid(); it->Next()) { + std::string key = it->key().ToString(); + std::string value = it->value().ToString(); + (*stream_load_records)[key] = value; + num++; + if (num >= batch_size) { + return Status::OK(); + } + } + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_record.h b/be/src/runtime/stream_load/stream_load_record.h new file mode 100644 index 00000000000000..a0eee8fe5ac830 --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_record.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "rocksdb/utilities/db_ttl.h" + +#pragma once + +namespace doris { + +class Status; + +class StreamLoadRecord { +public: + StreamLoadRecord(const std::string& root_path); + + virtual ~StreamLoadRecord(); + + Status init(); + + Status put(const std::string& key, const std::string& value); + + Status get_batch(const std::string& start, const int batch_size, std::map* stream_load_records); + +private: + std::string _root_path; + rocksdb::DBWithTTL* _db; + std::vector _handles; + + int64_t _last_compaction_time; + + enum ColumnFamilyIndex { + DEFAULT_COLUMN_FAMILY_INDEX = 0, + STREAM_LOAD_COLUMN_FAMILY_INDEX + }; + + const std::string DEFAULT_COLUMN_FAMILY = "default"; + const std::string STREAM_LOAD_COLUMN_FAMILY = "stream_load"; +}; + +} // namespace doris diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 6ade79cd725687..4e2727ead2405a 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include "common/config.h" @@ -45,6 +46,7 @@ #include "runtime/result_buffer_mgr.h" #include "runtime/result_queue_mgr.h" #include "runtime/routine_load/routine_load_task_executor.h" +#include "runtime/stream_load/stream_load_context.h" #include "service/backend_options.h" #include "util/arrow/row_batch.h" #include "util/blocking_queue.hpp" @@ -312,4 +314,25 @@ void BackendService::close_scanner(TScanCloseResult& result_, const TScanClosePa result_.status = t_status; } +void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const std::string& params) { + auto stream_load_record = StorageEngine::instance()->get_stream_load_record(); + if (stream_load_record != nullptr) { + std::map records; + auto st = stream_load_record->get_batch(params, config::stream_load_record_batch_size, &records); + if (st.ok()) { + LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " << records.size(); + std::map stream_load_record_batch; + std::map::iterator it = records.begin(); + for (; it != records.end(); it++) { + TStreamLoadRecord stream_load_item; + StreamLoadContext::parse_stream_load_record(it->second, stream_load_item); + stream_load_record_batch.emplace(it->first.c_str(), stream_load_item); + } + result.__set_stream_load_record(stream_load_record_batch); + } + } else { + LOG(WARNING) << "stream_load_record is null."; + } +} + } // namespace doris diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index b5b7adc9c8bec0..41a715eef49bc7 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -60,6 +60,7 @@ class TSessionState; class TQueryOptions; class TExportTaskRequest; class TExportStatusResult; +class TStreamLoadRecordResult; // This class just forward rpc for actual handler // make this class because we can bind multiple service on single point @@ -140,6 +141,8 @@ class BackendService : public BackendServiceIf { // used for external service, close some context and release resource related with this context virtual void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params); + virtual void get_stream_load_record(TStreamLoadRecordResult& result, const std::string& params) override; + private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); ExecEnv* _exec_env; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index cd54710f846062..17fbcf01ee0dd8 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -157,6 +157,7 @@ import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadScheduler; import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; +import org.apache.doris.load.StreamLoadRecordMgr; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.meta.MetaContext; @@ -297,6 +298,7 @@ public class Catalog { private Load load; private LoadManager loadManager; + private StreamLoadRecordMgr streamLoadRecordMgr; private RoutineLoadManager routineLoadManager; private ExportMgr exportMgr; private Alter alter; @@ -546,6 +548,7 @@ private Catalog(boolean isCheckpointCatalog) { Config.async_loading_load_task_pool_size / 5, !isCheckpointCatalog); this.loadJobScheduler = new LoadJobScheduler(); this.loadManager = new LoadManager(loadJobScheduler); + this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000); this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager); this.loadEtlChecker = new LoadEtlChecker(loadManager); this.loadLoadingChecker = new LoadLoadingChecker(loadManager); @@ -1294,6 +1297,7 @@ private void startMasterOnlyDaemonThreads() { dynamicPartitionScheduler.start(); // start daemon thread to update db used data quota for db txn manager periodly updateDbUsedDataQuotaDaemon.start(); + streamLoadRecordMgr.start(); } // start threads that should running on all FE @@ -4785,6 +4789,10 @@ public LoadManager getLoadManager() { return loadManager; } + public StreamLoadRecordMgr getStreamLoadRecordMgr() { + return streamLoadRecordMgr; + } + public MasterTaskExecutor getPendingLoadTaskScheduler() { return pendingLoadTaskScheduler; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java b/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java index 235d783f2e6ed2..adc010361029e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/AuditLog.java @@ -26,6 +26,7 @@ public class AuditLog { public static final AuditLog SLOW_AUDIT = new AuditLog("audit.slow_query"); public static final AuditLog QUERY_AUDIT = new AuditLog("audit.query"); public static final AuditLog LOAD_AUDIT = new AuditLog("audit.load"); + public static final AuditLog STREAM_LOAD_AUDIT = new AuditLog("audit.stream_load"); private Logger logger; @@ -41,6 +42,10 @@ public static AuditLog getLoadAudit() { return LOAD_AUDIT; } + public static AuditLog getStreamLoadAudit() { + return STREAM_LOAD_AUDIT; + } + public AuditLog(String auditName) { logger = LogManager.getLogger(auditName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 27206c664a39cc..46c8885325309d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -103,7 +103,7 @@ public class Config extends ConfigBase { */ @ConfField public static String audit_log_dir = PaloFe.DORIS_HOME_DIR + "/log"; @ConfField public static int audit_log_roll_num = 90; - @ConfField public static String[] audit_log_modules = {"slow_query", "query", "load"}; + @ConfField public static String[] audit_log_modules = {"slow_query", "query", "load", "stream_load"}; @ConfField(mutable = true) public static long qe_slow_log_ms = 5000; @ConfField public static String audit_log_roll_interval = "DAY"; @ConfField public static String audit_log_delete_age = "30d"; @@ -584,7 +584,13 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = true) public static int desired_max_waiting_jobs = 100; - + + /** + * fetch stream load record interval. + */ + @ConfField(mutable = true, masterOnly = true) + public static int fetch_stream_load_record_interval_second = 120; + /** * maximum concurrent running txn num including prepare, commit txns under a single db * txn manager will reject coming txns diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index cec18127f3dbaf..89547b6ad22015 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -43,6 +43,7 @@ import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord; import org.apache.doris.master.Checkpoint; import org.apache.doris.mysql.privilege.UserPropertyInfo; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; @@ -508,6 +509,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_FETCH_STREAM_LOAD_RECORD: { + data = FetchStreamLoadRecord.read(in); + isRead = true; + break; + } case OperationType.OP_CREATE_RESOURCE: { data = Resource.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java new file mode 100644 index 00000000000000..2dd0460f5040b9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.plugin.AuditEvent; +import org.apache.doris.plugin.AuditEvent.EventType; +import org.apache.doris.plugin.StreamLoadAuditEvent; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.BackendService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStreamLoadRecord; +import org.apache.doris.thrift.TStreamLoadRecordResult; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.common.collect.Maps; +import com.google.common.collect.ImmutableMap; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +public class StreamLoadRecordMgr extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class); + + public StreamLoadRecordMgr(String name, long intervalMs) { + super(name, intervalMs); + } + + @Override + protected void runAfterCatalogReady() { + ImmutableMap backends = Catalog.getCurrentSystemInfo().getIdToBackend(); + long start = System.currentTimeMillis(); + int pullRecordSize = 0; + Map beIdToLastStreamLoad = Maps.newHashMap(); + for (Backend backend : backends.values()) { + BackendService.Client client = null; + TNetworkAddress address = null; + boolean ok = false; + try { + address = new TNetworkAddress(backend.getHost(), backend.getBePort()); + client = ClientPool.backendPool.borrowObject(address); + TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime()); + Map streamLoadRecordBatch = result.getStreamLoadRecord(); + pullRecordSize += streamLoadRecordBatch.size(); + String lastStreamLoadTime = ""; + for (Map.Entry entry : streamLoadRecordBatch.entrySet()) { + TStreamLoadRecord streamLoadItem= entry.getValue(); + LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + + " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," + + " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.", + backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), + streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), + streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), streamLoadItem.getStartTime(), + streamLoadItem.getFinishTime()); + + AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH) + .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl()) + .setUser(streamLoadItem.getUser()).setClientIp(streamLoadItem.getUserIp()).setStatus(streamLoadItem.getStatus()) + .setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl()).setTotalRows(streamLoadItem.getTotalRows()) + .setLoadedRows( streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows()) + .setUnselectedRows(streamLoadItem.getUnselectedRows()).setLoadBytes(streamLoadItem.getLoadBytes()) + .setStartTime(streamLoadItem.getStartTime()).setFinishTime(streamLoadItem.getFinishTime()) + .build(); + Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent); + if (entry.getKey().compareTo(lastStreamLoadTime) > 0) { + lastStreamLoadTime = entry.getKey(); + } + } + if (streamLoadRecordBatch.size() > 0) { + backend.setLastStreamLoadTime(lastStreamLoadTime); + beIdToLastStreamLoad.put(backend.getId(), lastStreamLoadTime); + } else { + beIdToLastStreamLoad.put(backend.getId(), backend.getLastStreamLoadTime()); + } + + ok = true; + } catch (Exception e) { + LOG.warn("task exec error. backend[{}]", backend.getId(), e); + } finally { + if (ok) { + ClientPool.backendPool.returnObject(address, client); + } else { + ClientPool.backendPool.invalidateObject(address, client); + } + } + } + LOG.info("finished to pull stream load records of all backends. record size: {}, cost: {} ms", pullRecordSize, (System.currentTimeMillis() - start)); + if (pullRecordSize > 0) { + FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(beIdToLastStreamLoad); + Catalog.getCurrentCatalog().getEditLog().logFetchStreamLoadRecord(fetchStreamLoadRecord); + } + } + + public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) { + ImmutableMap backends = Catalog.getCurrentSystemInfo().getIdToBackend(); + Map beIdToLastStreamLoad = fetchStreamLoadRecord.getBeIdToLastStreamLoad(); + for (Backend backend : backends.values()) { + String lastStreamLoadTime = beIdToLastStreamLoad.get(backend.getId()); + if (lastStreamLoadTime != null) { + LOG.info("Replay stream load bdbje. backend: {}, last stream load version: {}", backend.getHost(), lastStreamLoadTime); + backend.setLastStreamLoadTime(lastStreamLoadTime); + } + } + } + + public static class FetchStreamLoadRecord implements Writable { + private Map beIdToLastStreamLoad; + + public FetchStreamLoadRecord(Map beIdToLastStreamLoad) { + this.beIdToLastStreamLoad = beIdToLastStreamLoad; + } + + public void setBeIdToLastStreamLoad(Map beIdToLastStreamLoad) { + this.beIdToLastStreamLoad = beIdToLastStreamLoad; + } + + public Map getBeIdToLastStreamLoad() { + return beIdToLastStreamLoad; + } + + @Override + public void write(DataOutput out) throws IOException { + for (Map.Entry entry : beIdToLastStreamLoad.entrySet()) { + out.writeBoolean(true); + out.writeLong(entry.getKey()); + out.writeBoolean(true); + Text.writeString(out, entry.getValue()); + LOG.debug("Write stream load bdbje. key: {}, value: {} ", entry.getKey(), entry.getValue()); + } + } + + public static FetchStreamLoadRecord read(DataInput in) throws IOException { + Map idToLastStreamLoad = Maps.newHashMap(); + int beNum = Catalog.getCurrentSystemInfo().getIdToBackend().size(); + for (int i = 0; i < beNum; i++) { + long beId = -1; + String lastStreamLoad = null; + if (in.readBoolean()) { + beId = in.readLong(); + } + if (in.readBoolean()) { + lastStreamLoad = Text.readString(in); + } + if (beId != -1 && lastStreamLoad != null) { + idToLastStreamLoad.put(beId, lastStreamLoad); + } + LOG.debug("Read stream load bdbje. key: {}, value: {} ", beId, lastStreamLoad); + } + FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(idToLastStreamLoad); + return fetchStreamLoadRecord; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 5f7e19418f7b6f..42a240fd8bf98a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -55,6 +55,7 @@ import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.RoutineLoadJob; +import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord; import org.apache.doris.meta.MetaContext; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.UserPropertyInfo; @@ -691,6 +692,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { catalog.getLoadManager().replayUpdateLoadJobStateInfo(info); break; } + case OperationType.OP_FETCH_STREAM_LOAD_RECORD: { + FetchStreamLoadRecord fetchStreamLoadRecord = (FetchStreamLoadRecord) journal.getData(); + catalog.getStreamLoadRecordMgr().replayFetchStreamLoadRecord(fetchStreamLoadRecord); + break; + } case OperationType.OP_CREATE_RESOURCE: { final Resource resource = (Resource) journal.getData(); catalog.getResourceMgr().replayCreateResource(resource); @@ -1292,6 +1298,10 @@ public void logUpdateLoadJob(LoadJobStateUpdateInfo info) { logEdit(OperationType.OP_UPDATE_LOAD_JOB, info); } + public void logFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) { + logEdit(OperationType.OP_FETCH_STREAM_LOAD_RECORD, fetchStreamLoadRecord); + } + public void logCreateResource(Resource resource) { logEdit(OperationType.OP_CREATE_RESOURCE, resource); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 08baea3dfe730a..fa18b981a5b7a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -164,6 +164,8 @@ public class OperationType { public static final short OP_END_LOAD_JOB = 231; // update job info, used by spark load public static final short OP_UPDATE_LOAD_JOB = 232; + // fetch stream load record + public static final short OP_FETCH_STREAM_LOAD_RECORD = 233; // small files 251~260 public static final short OP_CREATE_SMALL_FILE = 251; diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java index c02cdd8c6c6e58..af789184615106 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java @@ -36,7 +36,8 @@ public enum EventType { DISCONNECTION, BEFORE_QUERY, AFTER_QUERY, - LOAD_SUCCEED + LOAD_SUCCEED, + STREAM_LOAD_FINISH } @Retention(RetentionPolicy.RUNTIME) diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java new file mode 100644 index 00000000000000..04b15b9264c958 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/StreamLoadAuditEvent.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.plugin; + +public class StreamLoadAuditEvent extends AuditEvent { + + @AuditField(value = "Label") + public String label = ""; + @AuditField(value = "Table") + public String table = ""; + @AuditField(value = "ClientIp") + public String clientIp = ""; + @AuditField(value = "Status") + public String status = ""; + @AuditField(value = "Message") + public String message = ""; + @AuditField(value = "Url") + public String url = ""; + @AuditField(value = "TotalRows") + public long totalRows = -1; + @AuditField(value = "LoadedRows") + public long loadedRows = -1; + @AuditField(value = "FilteredRows") + public long filteredRows = -1; + @AuditField(value = "UnselectedRows") + public long unselectedRows = -1; + @AuditField(value = "LoadBytes") + public long loadBytes = -1; + @AuditField(value = "StartTime") + public String startTime = ""; + @AuditField(value = "FinishTime") + public String finishTime = ""; + + public static class AuditEventBuilder { + + private StreamLoadAuditEvent auditEvent = new StreamLoadAuditEvent(); + + public AuditEventBuilder() { + } + + public void reset() { + auditEvent = new StreamLoadAuditEvent(); + } + + public AuditEventBuilder setEventType(EventType eventType) { + auditEvent.type = eventType; + return this; + } + + public AuditEventBuilder setLabel(String label) { + auditEvent.label = label; + return this; + } + + public AuditEventBuilder setDb(String db) { + auditEvent.db = db; + return this; + } + + public AuditEventBuilder setTable(String table) { + auditEvent.table = table; + return this; + } + + public AuditEventBuilder setUser(String user) { + auditEvent.user = user; + return this; + } + + public AuditEventBuilder setClientIp(String clientIp) { + auditEvent.clientIp = clientIp; + return this; + } + + public AuditEventBuilder setStatus(String status) { + auditEvent.status = status; + return this; + } + + public AuditEventBuilder setMessage(String message) { + auditEvent.message = message; + return this; + } + + public AuditEventBuilder setUrl(String url) { + auditEvent.url = url; + return this; + } + + public AuditEventBuilder setTotalRows(long totalRows) { + auditEvent.totalRows = totalRows; + return this; + } + + public AuditEventBuilder setLoadedRows(long loadedRows) { + auditEvent.loadedRows = loadedRows; + return this; + } + + public AuditEventBuilder setFilteredRows(long filteredRows) { + auditEvent.filteredRows = filteredRows; + return this; + } + + public AuditEventBuilder setUnselectedRows(long unselectedRows) { + auditEvent.unselectedRows = unselectedRows; + return this; + } + + public AuditEventBuilder setLoadBytes(long loadBytes) { + auditEvent.loadBytes = loadBytes; + return this; + } + + public AuditEventBuilder setStartTime(String startTime) { + auditEvent.startTime = startTime; + return this; + } + + public AuditEventBuilder setFinishTime(String finishTime) { + auditEvent.finishTime = finishTime; + return this; + } + + public AuditEvent build() { + return this.auditEvent; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java index 3fd1b0a89731c5..c940e0d2f4d99c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogBuilder.java @@ -51,11 +51,18 @@ public class AuditLogBuilder extends Plugin implements AuditPlugin { private Set loadAnnotationSet; + private final String[] STREAM_LOAD_ANNONATION_NAMES = {"Label", "Db", "Table", "User", "ClientIp", + "Status", "Message", "Url", "TotalRows", "LoadedRows", "FilteredRows", "UnselectedRows", + "LoadBytes", "StartTime", "FinishTime"}; + + private Set streamLoadAnnotationSet; + public AuditLogBuilder() { pluginInfo = new PluginInfo(PluginMgr.BUILTIN_PLUGIN_PREFIX + "AuditLogBuilder", PluginType.AUDIT, "builtin audit logger", DigitalVersion.fromString("0.12.0"), DigitalVersion.fromString("1.8.31"), AuditLogBuilder.class.getName(), null, null); loadAnnotationSet = Sets.newHashSet(LOAD_ANNONATION_NAMES); + streamLoadAnnotationSet = Sets.newHashSet(STREAM_LOAD_ANNONATION_NAMES); } public PluginInfo getPluginInfo() { @@ -64,7 +71,7 @@ public PluginInfo getPluginInfo() { @Override public boolean eventFilter(EventType type) { - return type == EventType.AFTER_QUERY || type == EventType.LOAD_SUCCEED; + return type == EventType.AFTER_QUERY || type == EventType.LOAD_SUCCEED || type == EventType.STREAM_LOAD_FINISH; } @Override @@ -77,6 +84,9 @@ public void exec(AuditEvent event) { case LOAD_SUCCEED: auditLoadLog(event); break; + case STREAM_LOAD_FINISH: + auditStreamLoadLog(event); + break; default: break; } @@ -132,4 +142,22 @@ private void auditLoadLog(AuditEvent event) throws IllegalAccessException { String auditLog = sb.toString(); AuditLog.getLoadAudit().log(auditLog); } + + private void auditStreamLoadLog(AuditEvent event) throws IllegalAccessException { + Field[] fields = event.getClass().getFields(); + Map annotationToFieldValueMap = Maps.newHashMap(); + for (Field f : fields) { + AuditField af = f.getAnnotation(AuditField.class); + if (af == null || !streamLoadAnnotationSet.contains(af.value())) { + continue; + } + annotationToFieldValueMap.put(af.value(), String.valueOf(f.get(event))); + } + StringBuilder sb = new StringBuilder(); + for (String annotation : STREAM_LOAD_ANNONATION_NAMES) { + sb.append("|").append(annotation).append("=").append(annotationToFieldValueMap.get(annotation)); + } + String auditLog = sb.toString(); + AuditLog.getStreamLoadAudit().log(auditLog); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index b47506ea23b9e7..ff4fcd3fcc7ae6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -95,6 +95,8 @@ public enum BackendState { // additional backendStatus information for BE, display in JSON format private BackendStatus backendStatus = new BackendStatus(); + private String lastStreamLoadTime = ""; + public Backend() { this.host = ""; this.version = ""; @@ -112,6 +114,8 @@ public Backend() { this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); + + this.lastStreamLoadTime = ""; } public Backend(long id, String host, int heartbeatPort) { @@ -132,6 +136,8 @@ public Backend(long id, String host, int heartbeatPort) { this.ownerClusterName = ""; this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); + + this.lastStreamLoadTime = ""; } public long getId() { @@ -170,6 +176,12 @@ public String getHeartbeatErrMsg() { return heartbeatErrMsg; } + public String getLastStreamLoadTime() { return lastStreamLoadTime; } + + public void setLastStreamLoadTime(String lastStreamLoadTime) { + this.lastStreamLoadTime = lastStreamLoadTime; + } + // for test only public void updateOnce(int bePort, int httpPort, int beRpcPort) { if (this.bePort != bePort) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index b8bb7d93001afa..10942b4e7913ae 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -45,6 +45,7 @@ import org.apache.doris.thrift.TScanOpenResult; import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStatus; +import org.apache.doris.thrift.TStreamLoadRecordResult; import org.apache.doris.thrift.TTabletStatResult; import org.apache.doris.thrift.TTransmitDataParams; import org.apache.doris.thrift.TTransmitDataResult; @@ -218,6 +219,12 @@ public TScanBatchResult getNext(TScanNextBatchParams params) throws TException { public TScanCloseResult closeScanner(TScanCloseParams params) throws TException { return null; } + + @Override + public TStreamLoadRecordResult getStreamLoadRecord(String params) throws TException { + // TODO Auto-generated method stub + return null; + } } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 06b6ecd729a209..84e580f09836cb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -65,6 +65,7 @@ import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.thrift.TStreamLoadRecordResult; import org.apache.doris.thrift.TTabletStatResult; import org.apache.doris.thrift.TTransmitDataParams; import org.apache.doris.thrift.TTransmitDataResult; @@ -281,6 +282,11 @@ public TScanBatchResult getNext(TScanNextBatchParams params) throws TException { public TScanCloseResult closeScanner(TScanCloseParams params) throws TException { return null; } + + @Override + public TStreamLoadRecordResult getStreamLoadRecord(String params) throws TException { + return new TStreamLoadRecordResult(Maps.newHashMap()); + } } // The default Brpc service. diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 5b53c7f5a46d5a..75d5c98734b637 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -80,6 +80,31 @@ struct TProxyResult { 2: optional TKafkaMetaProxyResult kafka_meta_result; } +struct TStreamLoadRecord { + 1: optional string cluster + 2: required string user + 3: required string passwd + 4: required string db + 5: required string tbl + 6: optional string user_ip + 7: required string label + 8: required string status + 9: required string message + 10: optional string url + 11: optional i64 auth_code; + 12: required i64 total_rows + 13: required i64 loaded_rows + 14: required i64 filtered_rows + 15: required i64 unselected_rows + 16: required i64 load_bytes + 17: required string start_time + 18: required string finish_time +} + +struct TStreamLoadRecordResult { + 1: required map stream_load_record +} + service BackendService { // Called by coord to start asynchronous execution of plan fragment in backend. // Returns as soon as all incoming data streams have been set up. @@ -134,4 +159,6 @@ service BackendService { // release the context resource associated with the context_id DorisExternalService.TScanCloseResult close_scanner(1: DorisExternalService.TScanCloseParams params); + TStreamLoadRecordResult get_stream_load_record(1: string params); + } From 30eae4b3b47298851e3c9da437ebf27b0501cf7a Mon Sep 17 00:00:00 2001 From: weizuo Date: Tue, 6 Apr 2021 14:00:16 +0800 Subject: [PATCH 2/6] add default path for stream_load_recorder --- be/src/common/config.h | 2 +- be/src/http/action/stream_load.cpp | 10 +++++----- be/src/olap/storage_engine.cpp | 20 +++++++++---------- be/src/olap/storage_engine.h | 8 ++++---- be/src/runtime/CMakeLists.txt | 2 +- ...ad_record.cpp => stream_load_recorder.cpp} | 12 +++++------ ...m_load_record.h => stream_load_recorder.h} | 6 +++--- be/src/service/backend_service.cpp | 8 ++++---- gensrc/thrift/BackendService.thrift | 2 +- 9 files changed, 35 insertions(+), 35 deletions(-) rename be/src/runtime/stream_load/{stream_load_record.cpp => stream_load_recorder.cpp} (90%) rename be/src/runtime/stream_load/{stream_load_record.h => stream_load_recorder.h} (93%) diff --git a/be/src/common/config.h b/be/src/common/config.h index 31012f9f4d8c9e..52c542c6ceae53 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -355,7 +355,7 @@ CONF_Int32(stream_load_record_expire_time_secs, "28800"); // time interval to clean expired stream load records CONF_mInt64(clean_stream_load_record_interval_secs, "1800"); // the storage path of stream load record rocksdb -CONF_String(stream_load_record_path, ""); +CONF_String(stream_load_record_path, "${DORIS_HOME}"); // OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc. // You may need to lower the speed when the sink receiver bes are too busy. diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 5db8f9fe1bfac1..e2548f9c05b407 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -49,7 +49,7 @@ #include "runtime/stream_load/stream_load_context.h" #include "runtime/stream_load/stream_load_executor.h" #include "runtime/stream_load/stream_load_pipe.h" -#include "runtime/stream_load/stream_load_record.h" +#include "runtime/stream_load/stream_load_recorder.h" #include "util/byte_buffer.h" #include "util/debug_util.h" #include "util/doris_metrics.h" @@ -540,15 +540,15 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa } void StreamLoadAction::_sava_stream_load_record(StreamLoadContext* ctx, const std::string& str) { - auto stream_load_record = StorageEngine::instance()->get_stream_load_record(); - if (stream_load_record != nullptr) { + auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); + if (stream_load_recorder != nullptr) { std::string key = ToStringFromUnixMicros(ctx->start_micros + ctx->load_cost_micros) + "_" + ctx->label; - auto st = stream_load_record->put(key, str); + auto st = stream_load_recorder->put(key, str); if (st.ok()) { LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label << ", key: " << key; } } else { - LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_record is null."; + LOG(WARNING) << "put stream_load_record rocksdb failed. stream_load_recorder is null."; } } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 9f5764ad64991c..58930c8702bf92 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -121,7 +121,7 @@ StorageEngine::StorageEngine(const EngineOptions& options) _memtable_flush_executor(nullptr), _default_rowset_type(ALPHA_ROWSET), _heartbeat_flags(nullptr), - _stream_load_record(nullptr) { + _stream_load_recorder(nullptr) { if (_s_instance == nullptr) { _s_instance = this; } @@ -226,27 +226,27 @@ Status StorageEngine::_init_store_map() { _store_map.emplace(store->path(), store); } - RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_record(), "init StreamLoadRecord failed"); + RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_recorder(), "init StreamLoadRecorder failed"); return Status::OK(); } -Status StorageEngine::_init_stream_load_record() { +Status StorageEngine::_init_stream_load_recorder() { std::string stream_load_record_path = config::stream_load_record_path; LOG(INFO) << "stream load record path: " << stream_load_record_path; // init stream load record rocksdb - _stream_load_record.reset(new StreamLoadRecord(stream_load_record_path)); - if (_stream_load_record == nullptr) { + _stream_load_recorder.reset(new StreamLoadRecorder(stream_load_record_path)); + if (_stream_load_recorder == nullptr) { RETURN_NOT_OK_STATUS_WITH_WARN( - Status::MemoryAllocFailed("allocate memory for StreamLoadRecord failed"), - "new StreamLoadRecord failed"); + Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"), + "new StreamLoadRecorder failed"); } - auto st = _stream_load_record->init(); + auto st = _stream_load_recorder->init(); if (!st.ok()) { RETURN_NOT_OK_STATUS_WITH_WARN( - Status::IOError(Substitute("open StreamLoadRecord rocksdb failed, path=$0", stream_load_record_path)), - "init StreamLoadRecord failed"); + Status::IOError(Substitute("open StreamLoadRecorder rocksdb failed, path=$0", stream_load_record_path)), + "init StreamLoadRecorder failed"); } return Status::OK(); } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 2216199c029586..78b8916093616c 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -50,7 +50,7 @@ #include "olap/task/engine_task.h" #include "olap/txn_manager.h" #include "runtime/heartbeat_flags.h" -#include "runtime/stream_load/stream_load_record.h" +#include "runtime/stream_load/stream_load_recorder.h" #include "util/countdown_latch.h" #include "util/thread.h" #include "util/threadpool.h" @@ -175,7 +175,7 @@ class StorageEngine { void create_base_compaction(TabletSharedPtr best_tablet, std::shared_ptr& base_compaction); - std::shared_ptr get_stream_load_record() { return _stream_load_record; } + std::shared_ptr get_stream_load_recorder() { return _stream_load_recorder; } private: // Instance should be inited from `static open()` @@ -245,7 +245,7 @@ class StorageEngine { void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet); void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet); - Status _init_stream_load_record(); + Status _init_stream_load_recorder(); private: struct CompactionCandidate { @@ -351,7 +351,7 @@ class StorageEngine { std::mutex _compaction_producer_sleep_mutex; std::condition_variable _compaction_producer_sleep_cv; - std::shared_ptr _stream_load_record; + std::shared_ptr _stream_load_recorder; DISALLOW_COPY_AND_ASSIGN(StorageEngine); }; diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 8004903a6859b5..29e4a33a961cf9 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -91,7 +91,7 @@ set(RUNTIME_FILES message_body_sink.cpp stream_load/stream_load_context.cpp stream_load/stream_load_executor.cpp - stream_load/stream_load_record.cpp + stream_load/stream_load_recorder.cpp stream_load/load_stream_mgr.cpp routine_load/data_consumer.cpp routine_load/data_consumer_group.cpp diff --git a/be/src/runtime/stream_load/stream_load_record.cpp b/be/src/runtime/stream_load/stream_load_recorder.cpp similarity index 90% rename from be/src/runtime/stream_load/stream_load_record.cpp rename to be/src/runtime/stream_load/stream_load_recorder.cpp index caeb472ebbae81..fb44df4867f4ab 100644 --- a/be/src/runtime/stream_load/stream_load_record.cpp +++ b/be/src/runtime/stream_load/stream_load_recorder.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/stream_load/stream_load_record.h" +#include "runtime/stream_load/stream_load_recorder.h" #include "common/config.h" #include "common/status.h" @@ -31,13 +31,13 @@ namespace doris { const std::string STREAM_LOAD_POSTFIX = "/stream_load"; -StreamLoadRecord::StreamLoadRecord(const std::string& root_path) +StreamLoadRecorder::StreamLoadRecorder(const std::string& root_path) : _root_path(root_path), _db(nullptr), _last_compaction_time(UnixMillis()) { } -StreamLoadRecord::~StreamLoadRecord() { +StreamLoadRecorder::~StreamLoadRecorder() { if (_db != nullptr) { for (auto handle : _handles) { _db->DestroyColumnFamilyHandle(handle); @@ -48,7 +48,7 @@ StreamLoadRecord::~StreamLoadRecord() { } } -Status StreamLoadRecord::init() { +Status StreamLoadRecorder::init() { // init db rocksdb::DBOptions options; options.IncreaseParallelism(); @@ -70,7 +70,7 @@ Status StreamLoadRecord::init() { return Status::OK(); } -Status StreamLoadRecord::put(const std::string& key, const std::string& value) { +Status StreamLoadRecorder::put(const std::string& key, const std::string& value) { rocksdb::ColumnFamilyHandle* handle = _handles[1]; rocksdb::WriteOptions write_options; write_options.sync = false; @@ -90,7 +90,7 @@ Status StreamLoadRecord::put(const std::string& key, const std::string& value) { return Status::OK(); } -Status StreamLoadRecord::get_batch(const std::string& start, const int batch_size, std::map* stream_load_records) { +Status StreamLoadRecorder::get_batch(const std::string& start, const int batch_size, std::map* stream_load_records) { rocksdb::ColumnFamilyHandle* handle = _handles[1]; std::unique_ptr it(_db->NewIterator(rocksdb::ReadOptions(), handle)); if (start == "") { diff --git a/be/src/runtime/stream_load/stream_load_record.h b/be/src/runtime/stream_load/stream_load_recorder.h similarity index 93% rename from be/src/runtime/stream_load/stream_load_record.h rename to be/src/runtime/stream_load/stream_load_recorder.h index a0eee8fe5ac830..c1740b8f9ab901 100644 --- a/be/src/runtime/stream_load/stream_load_record.h +++ b/be/src/runtime/stream_load/stream_load_recorder.h @@ -26,11 +26,11 @@ namespace doris { class Status; -class StreamLoadRecord { +class StreamLoadRecorder { public: - StreamLoadRecord(const std::string& root_path); + StreamLoadRecorder(const std::string& root_path); - virtual ~StreamLoadRecord(); + virtual ~StreamLoadRecorder(); Status init(); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 4e2727ead2405a..1663778114ceba 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -315,10 +315,10 @@ void BackendService::close_scanner(TScanCloseResult& result_, const TScanClosePa } void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const std::string& params) { - auto stream_load_record = StorageEngine::instance()->get_stream_load_record(); - if (stream_load_record != nullptr) { + auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); + if (stream_load_recorder != nullptr) { std::map records; - auto st = stream_load_record->get_batch(params, config::stream_load_record_batch_size, &records); + auto st = stream_load_recorder->get_batch(params, config::stream_load_record_batch_size, &records); if (st.ok()) { LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " << records.size(); std::map stream_load_record_batch; @@ -331,7 +331,7 @@ void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, con result.__set_stream_load_record(stream_load_record_batch); } } else { - LOG(WARNING) << "stream_load_record is null."; + LOG(WARNING) << "stream_load_recorder is null."; } } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 75d5c98734b637..191d09a18b46e9 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -159,6 +159,6 @@ service BackendService { // release the context resource associated with the context_id DorisExternalService.TScanCloseResult close_scanner(1: DorisExternalService.TScanCloseParams params); - TStreamLoadRecordResult get_stream_load_record(1: string params); + TStreamLoadRecordResult get_stream_load_record(1: string last_stream_record_time); } From ffe44b9f2ed2d0e00b705f26c56a02c58b72d68f Mon Sep 17 00:00:00 2001 From: weizuo Date: Tue, 6 Apr 2021 17:59:40 +0800 Subject: [PATCH 3/6] add default path for stream_load_recorder --- be/src/service/backend_service.cpp | 4 ++-- be/src/service/backend_service.h | 2 +- .../test/java/org/apache/doris/common/GenericPoolTest.java | 2 +- .../java/org/apache/doris/utframe/MockedBackendFactory.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 1663778114ceba..f760c9f06c4394 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -314,11 +314,11 @@ void BackendService::close_scanner(TScanCloseResult& result_, const TScanClosePa result_.status = t_status; } -void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const std::string& params) { +void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const std::string& last_stream_record_time) { auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); if (stream_load_recorder != nullptr) { std::map records; - auto st = stream_load_recorder->get_batch(params, config::stream_load_record_batch_size, &records); + auto st = stream_load_recorder->get_batch(last_stream_record_time, config::stream_load_record_batch_size, &records); if (st.ok()) { LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " << records.size(); std::map stream_load_record_batch; diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 41a715eef49bc7..c150016783ee34 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -141,7 +141,7 @@ class BackendService : public BackendServiceIf { // used for external service, close some context and release resource related with this context virtual void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params); - virtual void get_stream_load_record(TStreamLoadRecordResult& result, const std::string& params) override; + virtual void get_stream_load_record(TStreamLoadRecordResult& result, const std::string& last_stream_record_time) override; private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index 10942b4e7913ae..3dc249bb02f2ad 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -221,7 +221,7 @@ public TScanCloseResult closeScanner(TScanCloseParams params) throws TException } @Override - public TStreamLoadRecordResult getStreamLoadRecord(String params) throws TException { + public TStreamLoadRecordResult getStreamLoadRecord(String last_stream_record_time) throws TException { // TODO Auto-generated method stub return null; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 84e580f09836cb..29c6ae1144e58e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -284,7 +284,7 @@ public TScanCloseResult closeScanner(TScanCloseParams params) throws TException } @Override - public TStreamLoadRecordResult getStreamLoadRecord(String params) throws TException { + public TStreamLoadRecordResult getStreamLoadRecord(String last_stream_record_time) throws TException { return new TStreamLoadRecordResult(Maps.newHashMap()); } } From d9069f4cc1d376efbbd8c6684401b69fff67bf9e Mon Sep 17 00:00:00 2001 From: weizuo Date: Tue, 6 Apr 2021 22:06:14 +0800 Subject: [PATCH 4/6] transfer timestamp as long type by RPC --- be/src/http/action/stream_load.cpp | 2 +- .../stream_load/stream_load_context.cpp | 8 ++-- .../stream_load/stream_load_recorder.cpp | 5 +-- be/src/service/backend_service.cpp | 6 +-- be/src/service/backend_service.h | 2 +- .../doris/load/StreamLoadRecordMgr.java | 42 +++++++++---------- .../java/org/apache/doris/system/Backend.java | 10 ++--- .../apache/doris/common/GenericPoolTest.java | 2 +- .../doris/utframe/MockedBackendFactory.java | 2 +- gensrc/thrift/BackendService.thrift | 6 +-- 10 files changed, 42 insertions(+), 43 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index e2548f9c05b407..1a0e9f9e754ea1 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -542,7 +542,7 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa void StreamLoadAction::_sava_stream_load_record(StreamLoadContext* ctx, const std::string& str) { auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); if (stream_load_recorder != nullptr) { - std::string key = ToStringFromUnixMicros(ctx->start_micros + ctx->load_cost_micros) + "_" + ctx->label; + std::string key = std::to_string(ctx->start_micros + ctx->load_cost_micros) + "_" + ctx->label; auto st = stream_load_recorder->put(key, str); if (st.ok()) { LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label << ", key: " << key; diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index fbdc364fa80b6a..a68a3eb384402e 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -79,9 +79,9 @@ std::string StreamLoadContext::to_json() const { writer.Key("LoadBytes"); writer.Int64(receive_bytes); writer.Key("StartTime"); - writer.String(ToStringFromUnixMicros(start_micros).c_str()); + writer.Int64(start_micros); writer.Key("FinishTime"); - writer.String(ToStringFromUnixMicros(start_micros + load_cost_micros).c_str()); + writer.Int64(start_micros + load_cost_micros); writer.Key("LoadTimeMs"); writer.Int64(load_cost_micros / 1000); writer.Key("BeginTxnTimeMs"); @@ -179,12 +179,12 @@ void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_ if (document.HasMember("StartTime")) { const rapidjson::Value& start_time = document["StartTime"]; - stream_load_item.__set_start_time(start_time.GetString()); + stream_load_item.__set_start_time(start_time.GetInt64()); } if (document.HasMember("FinishTime")) { const rapidjson::Value& finish_time = document["FinishTime"]; - stream_load_item.__set_finish_time(finish_time.GetString()); + stream_load_item.__set_finish_time(finish_time.GetInt64()); } } diff --git a/be/src/runtime/stream_load/stream_load_recorder.cpp b/be/src/runtime/stream_load/stream_load_recorder.cpp index fb44df4867f4ab..443825bb7706ad 100644 --- a/be/src/runtime/stream_load/stream_load_recorder.cpp +++ b/be/src/runtime/stream_load/stream_load_recorder.cpp @@ -93,12 +93,11 @@ Status StreamLoadRecorder::put(const std::string& key, const std::string& value) Status StreamLoadRecorder::get_batch(const std::string& start, const int batch_size, std::map* stream_load_records) { rocksdb::ColumnFamilyHandle* handle = _handles[1]; std::unique_ptr it(_db->NewIterator(rocksdb::ReadOptions(), handle)); - if (start == "") { + if (start == "-1") { it->SeekToFirst(); } else { it->Seek(start); - rocksdb::Status status = it->status(); - if (status.ok()) { + if (it->Valid()) { it->Next(); } else { it->SeekToFirst(); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index f760c9f06c4394..dcc13ba998f5ee 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -314,13 +314,13 @@ void BackendService::close_scanner(TScanCloseResult& result_, const TScanClosePa result_.status = t_status; } -void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const std::string& last_stream_record_time) { +void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, const int64_t last_stream_record_time) { auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); if (stream_load_recorder != nullptr) { std::map records; - auto st = stream_load_recorder->get_batch(last_stream_record_time, config::stream_load_record_batch_size, &records); + auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time), config::stream_load_record_batch_size, &records); if (st.ok()) { - LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " << records.size(); + LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " << records.size() << ", last_timestamp: " << last_stream_record_time; std::map stream_load_record_batch; std::map::iterator it = records.begin(); for (; it != records.end(); it++) { diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index c150016783ee34..166a1366d82b04 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -141,7 +141,7 @@ class BackendService : public BackendServiceIf { // used for external service, close some context and release resource related with this context virtual void close_scanner(TScanCloseResult& result_, const TScanCloseParams& params); - virtual void get_stream_load_record(TStreamLoadRecordResult& result, const std::string& last_stream_record_time) override; + virtual void get_stream_load_record(TStreamLoadRecordResult& result, const int64_t last_stream_record_time) override; private: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index 2dd0460f5040b9..7989ae21762b28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -19,7 +19,6 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.common.ClientPool; -import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.plugin.AuditEvent; @@ -53,7 +52,7 @@ protected void runAfterCatalogReady() { ImmutableMap backends = Catalog.getCurrentSystemInfo().getIdToBackend(); long start = System.currentTimeMillis(); int pullRecordSize = 0; - Map beIdToLastStreamLoad = Maps.newHashMap(); + Map beIdToLastStreamLoad = Maps.newHashMap(); for (Backend backend : backends.values()) { BackendService.Client client = null; TNetworkAddress address = null; @@ -64,7 +63,7 @@ protected void runAfterCatalogReady() { TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime()); Map streamLoadRecordBatch = result.getStreamLoadRecord(); pullRecordSize += streamLoadRecordBatch.size(); - String lastStreamLoadTime = ""; + long lastStreamLoadTime = -1; for (Map.Entry entry : streamLoadRecordBatch.entrySet()) { TStreamLoadRecord streamLoadItem= entry.getValue(); LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + @@ -79,13 +78,13 @@ protected void runAfterCatalogReady() { .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl()) .setUser(streamLoadItem.getUser()).setClientIp(streamLoadItem.getUserIp()).setStatus(streamLoadItem.getStatus()) .setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl()).setTotalRows(streamLoadItem.getTotalRows()) - .setLoadedRows( streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows()) + .setLoadedRows(streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows()) .setUnselectedRows(streamLoadItem.getUnselectedRows()).setLoadBytes(streamLoadItem.getLoadBytes()) - .setStartTime(streamLoadItem.getStartTime()).setFinishTime(streamLoadItem.getFinishTime()) + .setStartTime(String.valueOf(streamLoadItem.getStartTime())).setFinishTime(String.valueOf(streamLoadItem.getFinishTime())) .build(); Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent); - if (entry.getKey().compareTo(lastStreamLoadTime) > 0) { - lastStreamLoadTime = entry.getKey(); + if (entry.getValue().getFinishTime() > lastStreamLoadTime) { + lastStreamLoadTime = entry.getValue().getFinishTime(); } } if (streamLoadRecordBatch.size() > 0) { @@ -106,7 +105,8 @@ protected void runAfterCatalogReady() { } } } - LOG.info("finished to pull stream load records of all backends. record size: {}, cost: {} ms", pullRecordSize, (System.currentTimeMillis() - start)); + LOG.info("finished to pull stream load records of all backends. record size: {}, cost: {} ms", + pullRecordSize, (System.currentTimeMillis() - start)); if (pullRecordSize > 0) { FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(beIdToLastStreamLoad); Catalog.getCurrentCatalog().getEditLog().logFetchStreamLoadRecord(fetchStreamLoadRecord); @@ -115,10 +115,10 @@ protected void runAfterCatalogReady() { public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) { ImmutableMap backends = Catalog.getCurrentSystemInfo().getIdToBackend(); - Map beIdToLastStreamLoad = fetchStreamLoadRecord.getBeIdToLastStreamLoad(); + Map beIdToLastStreamLoad = fetchStreamLoadRecord.getBeIdToLastStreamLoad(); for (Backend backend : backends.values()) { - String lastStreamLoadTime = beIdToLastStreamLoad.get(backend.getId()); - if (lastStreamLoadTime != null) { + if (beIdToLastStreamLoad.containsKey(backend.getId())) { + long lastStreamLoadTime = beIdToLastStreamLoad.get(backend.getId()); LOG.info("Replay stream load bdbje. backend: {}, last stream load version: {}", backend.getHost(), lastStreamLoadTime); backend.setLastStreamLoadTime(lastStreamLoadTime); } @@ -126,44 +126,44 @@ public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRec } public static class FetchStreamLoadRecord implements Writable { - private Map beIdToLastStreamLoad; + private Map beIdToLastStreamLoad; - public FetchStreamLoadRecord(Map beIdToLastStreamLoad) { + public FetchStreamLoadRecord(Map beIdToLastStreamLoad) { this.beIdToLastStreamLoad = beIdToLastStreamLoad; } - public void setBeIdToLastStreamLoad(Map beIdToLastStreamLoad) { + public void setBeIdToLastStreamLoad(Map beIdToLastStreamLoad) { this.beIdToLastStreamLoad = beIdToLastStreamLoad; } - public Map getBeIdToLastStreamLoad() { + public Map getBeIdToLastStreamLoad() { return beIdToLastStreamLoad; } @Override public void write(DataOutput out) throws IOException { - for (Map.Entry entry : beIdToLastStreamLoad.entrySet()) { + for (Map.Entry entry : beIdToLastStreamLoad.entrySet()) { out.writeBoolean(true); out.writeLong(entry.getKey()); out.writeBoolean(true); - Text.writeString(out, entry.getValue()); + out.writeLong(entry.getValue()); LOG.debug("Write stream load bdbje. key: {}, value: {} ", entry.getKey(), entry.getValue()); } } public static FetchStreamLoadRecord read(DataInput in) throws IOException { - Map idToLastStreamLoad = Maps.newHashMap(); + Map idToLastStreamLoad = Maps.newHashMap(); int beNum = Catalog.getCurrentSystemInfo().getIdToBackend().size(); for (int i = 0; i < beNum; i++) { long beId = -1; - String lastStreamLoad = null; + long lastStreamLoad = -1; if (in.readBoolean()) { beId = in.readLong(); } if (in.readBoolean()) { - lastStreamLoad = Text.readString(in); + lastStreamLoad = in.readLong(); } - if (beId != -1 && lastStreamLoad != null) { + if (beId != -1 && lastStreamLoad != -1) { idToLastStreamLoad.put(beId, lastStreamLoad); } LOG.debug("Read stream load bdbje. key: {}, value: {} ", beId, lastStreamLoad); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java index ff4fcd3fcc7ae6..bfe1ce9ce93729 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java @@ -95,7 +95,7 @@ public enum BackendState { // additional backendStatus information for BE, display in JSON format private BackendStatus backendStatus = new BackendStatus(); - private String lastStreamLoadTime = ""; + private long lastStreamLoadTime = -1; public Backend() { this.host = ""; @@ -115,7 +115,7 @@ public Backend() { this.decommissionType = DecommissionType.SystemDecommission.ordinal(); - this.lastStreamLoadTime = ""; + this.lastStreamLoadTime = -1; } public Backend(long id, String host, int heartbeatPort) { @@ -137,7 +137,7 @@ public Backend(long id, String host, int heartbeatPort) { this.backendState = BackendState.free.ordinal(); this.decommissionType = DecommissionType.SystemDecommission.ordinal(); - this.lastStreamLoadTime = ""; + this.lastStreamLoadTime = -1; } public long getId() { @@ -176,9 +176,9 @@ public String getHeartbeatErrMsg() { return heartbeatErrMsg; } - public String getLastStreamLoadTime() { return lastStreamLoadTime; } + public long getLastStreamLoadTime() { return lastStreamLoadTime; } - public void setLastStreamLoadTime(String lastStreamLoadTime) { + public void setLastStreamLoadTime(long lastStreamLoadTime) { this.lastStreamLoadTime = lastStreamLoadTime; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index 3dc249bb02f2ad..b8a6152151e35b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -221,7 +221,7 @@ public TScanCloseResult closeScanner(TScanCloseParams params) throws TException } @Override - public TStreamLoadRecordResult getStreamLoadRecord(String last_stream_record_time) throws TException { + public TStreamLoadRecordResult getStreamLoadRecord(long last_stream_record_time) throws TException { // TODO Auto-generated method stub return null; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 29c6ae1144e58e..64a833cfb890de 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -284,7 +284,7 @@ public TScanCloseResult closeScanner(TScanCloseParams params) throws TException } @Override - public TStreamLoadRecordResult getStreamLoadRecord(String last_stream_record_time) throws TException { + public TStreamLoadRecordResult getStreamLoadRecord(long last_stream_record_time) throws TException { return new TStreamLoadRecordResult(Maps.newHashMap()); } } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 191d09a18b46e9..200a8ead956534 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -97,8 +97,8 @@ struct TStreamLoadRecord { 14: required i64 filtered_rows 15: required i64 unselected_rows 16: required i64 load_bytes - 17: required string start_time - 18: required string finish_time + 17: required i64 start_time + 18: required i64 finish_time } struct TStreamLoadRecordResult { @@ -159,6 +159,6 @@ service BackendService { // release the context resource associated with the context_id DorisExternalService.TScanCloseResult close_scanner(1: DorisExternalService.TScanCloseParams params); - TStreamLoadRecordResult get_stream_load_record(1: string last_stream_record_time); + TStreamLoadRecordResult get_stream_load_record(1: i64 last_stream_record_time); } From 492aa678667b366a0129cf98efb567cbc18efcdf Mon Sep 17 00:00:00 2001 From: weizuo Date: Wed, 7 Apr 2021 20:43:15 +0800 Subject: [PATCH 5/6] transfer last stream load timestamp as int64 type from FE to BE by RPC --- be/src/http/action/stream_load.cpp | 8 +- be/src/runtime/CMakeLists.txt | 2 +- .../routine_load_task_executor.cpp | 2 +- .../stream_load/stream_load_context.cpp | 84 +++++++++++++++---- .../runtime/stream_load/stream_load_context.h | 7 +- .../stream_load/stream_load_executor.cpp | 2 +- .../stream_load/stream_load_recorder.cpp | 17 ++-- .../stream_load/stream_load_recorder.h | 5 +- be/src/service/backend_service.cpp | 2 +- .../doris/load/StreamLoadRecordMgr.java | 10 ++- 10 files changed, 97 insertions(+), 42 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 1a0e9f9e754ea1..f1f90e9250a4de 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -145,7 +145,7 @@ void StreamLoadAction::handle(HttpRequest* req) { << ", errmsg=" << ctx->status.get_error_msg(); } } - ctx->load_cost_micros = UnixMicros() - ctx->start_micros; + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) { if (ctx->need_rollback) { @@ -162,11 +162,12 @@ void StreamLoadAction::handle(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); + str = ctx->prepare_stream_load_record(str); _sava_stream_load_record(ctx, str); // update statstics streaming_load_requests_total->increment(1); - streaming_load_duration_ms->increment(ctx->load_cost_micros / 1000); + streaming_load_duration_ms->increment(ctx->load_cost_millis); streaming_load_bytes->increment(ctx->receive_bytes); streaming_load_current_processing->increment(-1); } @@ -233,6 +234,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); + str = ctx->prepare_stream_load_record(str); _sava_stream_load_record(ctx, str); return -1; @@ -542,7 +544,7 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa void StreamLoadAction::_sava_stream_load_record(StreamLoadContext* ctx, const std::string& str) { auto stream_load_recorder = StorageEngine::instance()->get_stream_load_recorder(); if (stream_load_recorder != nullptr) { - std::string key = std::to_string(ctx->start_micros + ctx->load_cost_micros) + "_" + ctx->label; + std::string key = std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label; auto st = stream_load_recorder->put(key, str); if (st.ok()) { LOG(INFO) << "put stream_load_record rocksdb successfully. label: " << ctx->label << ", key: " << key; diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 29e4a33a961cf9..47bea84a28b970 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -91,7 +91,7 @@ set(RUNTIME_FILES message_body_sink.cpp stream_load/stream_load_context.cpp stream_load/stream_load_executor.cpp - stream_load/stream_load_recorder.cpp + stream_load/stream_load_recorder.cpp stream_load/load_stream_mgr.cpp routine_load/data_consumer.cpp routine_load/data_consumer_group.cpp diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 353ce7aff94ff2..661ef57cee93e6 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -256,7 +256,7 @@ void RoutineLoadTaskExecutor::exec_task(StreamLoadContext* ctx, DataConsumerPool // wait for all consumers finished HANDLE_ERROR(ctx->future.get(), "consume failed"); - ctx->load_cost_micros = UnixMicros() - ctx->start_micros; + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; // return the consumer back to pool // call this before commit txn, in case the next task can come very fast diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index a68a3eb384402e..21cf3159666d87 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -17,6 +17,8 @@ #include "runtime/stream_load/stream_load_context.h" +#include + namespace doris { std::string StreamLoadContext::to_json() const { @@ -31,16 +33,6 @@ std::string StreamLoadContext::to_json() const { // label writer.Key("Label"); writer.String(label.c_str()); - writer.Key("cluster"); - writer.String(auth.cluster.c_str()); - writer.Key("Db"); - writer.String(db.c_str()); - writer.Key("Table"); - writer.String(table.c_str()); - writer.Key("User"); - writer.String(auth.user.c_str()); - writer.Key("ClientIp"); - writer.String(auth.user_ip.c_str()); // status writer.Key("Status"); @@ -78,12 +70,8 @@ std::string StreamLoadContext::to_json() const { writer.Int64(number_unselected_rows); writer.Key("LoadBytes"); writer.Int64(receive_bytes); - writer.Key("StartTime"); - writer.Int64(start_micros); - writer.Key("FinishTime"); - writer.Int64(start_micros + load_cost_micros); writer.Key("LoadTimeMs"); - writer.Int64(load_cost_micros / 1000); + writer.Int64(load_cost_millis); writer.Key("BeginTxnTimeMs"); writer.Int64(begin_txn_cost_nanos / 1000000); writer.Key("StreamLoadPutTimeMs"); @@ -103,89 +91,155 @@ std::string StreamLoadContext::to_json() const { return s.GetString(); } +std::string StreamLoadContext::prepare_stream_load_record(const std::string& stream_load_record) { + rapidjson::Document document; + if (document.Parse(stream_load_record.data()).HasParseError()) { + LOG(WARNING) << "prepare stream load record failed. failed to parse json returned to client. label=" << label; + return ""; + } + rapidjson::Document::AllocatorType& allocator = document.GetAllocator(); + + rapidjson::Value cluster_value(rapidjson::kStringType); + cluster_value.SetString(auth.cluster.c_str(), auth.cluster.size()); + if(!cluster_value.IsNull()) { + document.AddMember("cluster", cluster_value, allocator); + } + + rapidjson::Value db_value(rapidjson::kStringType); + db_value.SetString(db.c_str(), db.size()); + if(!db_value.IsNull()) { + document.AddMember("Db", db_value, allocator); + } + + rapidjson::Value table_value(rapidjson::kStringType); + table_value.SetString(table.c_str(), table.size()); + if(!table_value.IsNull()) { + document.AddMember("Table", table_value, allocator); + } + + rapidjson::Value user_value(rapidjson::kStringType); + user_value.SetString(auth.user.c_str(), auth.user.size()); + if(!user_value.IsNull()) { + document.AddMember("User", user_value, allocator); + } + + rapidjson::Value client_ip_value(rapidjson::kStringType); + client_ip_value.SetString(auth.user_ip.c_str(), auth.user_ip.size()); + if(!client_ip_value.IsNull()) { + document.AddMember("ClientIp", client_ip_value, allocator); + } + + document.AddMember("StartTime", start_millis, allocator); + document.AddMember("FinishTime", start_millis + load_cost_millis, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + document.Accept(writer); + return buffer.GetString(); +} + void StreamLoadContext::parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item) { rapidjson::Document document; + std::stringstream ss; if (document.Parse(stream_load_record.data()).HasParseError()) { + LOG(WARNING) << "failed to parse json from rocksdb."; return; } if (document.HasMember("Label")) { const rapidjson::Value& label = document["Label"]; stream_load_item.__set_label(label.GetString()); + ss << "Label: " << label.GetString(); } if (document.HasMember("Db")) { const rapidjson::Value& db = document["Db"]; stream_load_item.__set_db(db.GetString()); + ss << ", Db: " << db.GetString(); } if (document.HasMember("Table")) { const rapidjson::Value& table = document["Table"]; stream_load_item.__set_tbl(table.GetString()); + ss << ", Table: " << table.GetString(); } if (document.HasMember("User")) { const rapidjson::Value& user = document["User"]; stream_load_item.__set_user(user.GetString()); + ss << ", User: " << user.GetString(); } if (document.HasMember("ClientIp")) { const rapidjson::Value& client_ip = document["ClientIp"]; stream_load_item.__set_user_ip(client_ip.GetString()); + ss << ", ClientIp: " << client_ip.GetString(); } if (document.HasMember("Status")) { const rapidjson::Value& status = document["Status"]; stream_load_item.__set_status(status.GetString()); + ss << ", Status: " << status.GetString(); } if (document.HasMember("Message")) { const rapidjson::Value& message = document["Message"]; stream_load_item.__set_message(message.GetString()); + ss << ", Message: " << message.GetString(); } if (document.HasMember("ErrorURL")) { const rapidjson::Value& error_url = document["ErrorURL"]; stream_load_item.__set_url(error_url.GetString()); + ss << ", ErrorURL: " << error_url.GetString(); } else { stream_load_item.__set_url("N/A"); + ss << ", ErrorURL: N/A"; } if (document.HasMember("NumberTotalRows")) { const rapidjson::Value& total_rows = document["NumberTotalRows"]; stream_load_item.__set_total_rows(total_rows.GetInt()); + ss << ", NumberTotalRows: " << total_rows.GetInt(); } if (document.HasMember("NumberLoadedRows")) { const rapidjson::Value& loaded_rows = document["NumberLoadedRows"]; stream_load_item.__set_loaded_rows(loaded_rows.GetInt()); + ss << ", NumberLoadedRows: " << loaded_rows.GetInt(); } if (document.HasMember("NumberFilteredRows")) { const rapidjson::Value& filtered_rows = document["NumberFilteredRows"]; stream_load_item.__set_filtered_rows(filtered_rows.GetInt()); + ss << ", NumberFilteredRows: " << filtered_rows.GetInt64(); } if (document.HasMember("NumberUnselectedRows")) { const rapidjson::Value& unselected_rows = document["NumberUnselectedRows"]; stream_load_item.__set_unselected_rows(unselected_rows.GetInt()); + ss << ", NumberUnselectedRows: " << unselected_rows.GetInt64(); } if (document.HasMember("LoadBytes")) { const rapidjson::Value& load_bytes = document["LoadBytes"]; stream_load_item.__set_load_bytes(load_bytes.GetInt()); + ss << ", LoadBytes: " << load_bytes.GetInt64(); } if (document.HasMember("StartTime")) { const rapidjson::Value& start_time = document["StartTime"]; stream_load_item.__set_start_time(start_time.GetInt64()); + ss << ", StartTime: " << start_time.GetInt64(); } if (document.HasMember("FinishTime")) { const rapidjson::Value& finish_time = document["FinishTime"]; stream_load_item.__set_finish_time(finish_time.GetInt64()); + ss << ", FinishTime: " << finish_time.GetInt64(); } + + VLOG(1) << "parse json from rocksdb. " << ss.str(); } /* diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index adc4e0583431d8..aeca593605c762 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -83,7 +83,7 @@ class MessageBodySink; class StreamLoadContext { public: StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()), _exec_env(exec_env), _refs(0) { - start_micros = UnixMicros(); + start_millis = UnixMillis(); } ~StreamLoadContext() { @@ -97,6 +97,7 @@ class StreamLoadContext { std::string to_json() const; + std::string prepare_stream_load_record(const std::string& stream_load_record); static void parse_stream_load_record(const std::string& stream_load_record, TStreamLoadRecord& stream_load_item); // the old mini load result format is not same as stream load. @@ -172,9 +173,9 @@ class StreamLoadContext { int64_t number_filtered_rows = 0; int64_t number_unselected_rows = 0; int64_t loaded_bytes = 0; - int64_t start_micros = 0; + int64_t start_millis = 0; int64_t start_write_data_nanos = 0; - int64_t load_cost_micros = 0; + int64_t load_cost_millis = 0; int64_t begin_txn_cost_nanos = 0; int64_t stream_load_put_cost_nanos = 0; int64_t commit_and_publish_txn_cost_nanos = 0; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 6ce64be7b749b4..3c9a237b7ebc70 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -275,7 +275,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt rl_attach.__set_unselectedRows(ctx->number_unselected_rows); rl_attach.__set_receivedBytes(ctx->receive_bytes); rl_attach.__set_loadedBytes(ctx->loaded_bytes); - rl_attach.__set_loadCostMs(ctx->load_cost_micros / 1000); + rl_attach.__set_loadCostMs(ctx->load_cost_millis); attach->rlTaskTxnCommitAttachment = std::move(rl_attach); attach->__isset.rlTaskTxnCommitAttachment = true; diff --git a/be/src/runtime/stream_load/stream_load_recorder.cpp b/be/src/runtime/stream_load/stream_load_recorder.cpp index 443825bb7706ad..7029bc50af9234 100644 --- a/be/src/runtime/stream_load/stream_load_recorder.cpp +++ b/be/src/runtime/stream_load/stream_load_recorder.cpp @@ -44,7 +44,7 @@ StreamLoadRecorder::~StreamLoadRecorder() { handle = nullptr; } delete _db; - _db= nullptr; + _db = nullptr; } } @@ -58,31 +58,28 @@ Status StreamLoadRecorder::init() { std::vector column_families; // default column family is required column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions()); - // stream load column family - column_families.emplace_back(STREAM_LOAD_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions()); - std::vector ttls = {config::stream_load_record_expire_time_secs, config::stream_load_record_expire_time_secs}; + std::vector ttls = {config::stream_load_record_expire_time_secs}; rocksdb::Status s = rocksdb::DBWithTTL::Open(options, db_path, column_families, &_handles, &_db, ttls); - if (!s.ok() || _db == nullptr) { LOG(WARNING) << "rocks db open failed, reason:" << s.ToString(); - return Status::InternalError("Stream load record rocksdb open failed"); + return Status::InternalError("Stream load record rocksdb open failed, reason: " + s.ToString()); } return Status::OK(); } Status StreamLoadRecorder::put(const std::string& key, const std::string& value) { - rocksdb::ColumnFamilyHandle* handle = _handles[1]; + rocksdb::ColumnFamilyHandle* handle = _handles[0]; rocksdb::WriteOptions write_options; write_options.sync = false; rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value)); if (!s.ok()) { LOG(WARNING) << "rocks db put key:" << key << " failed, reason:" << s.ToString(); - return Status::InternalError("Stream load record rocksdb put failed"); + return Status::InternalError("Stream load record rocksdb put failed, reason: " + s.ToString()); } if ((UnixMillis() - _last_compaction_time) / 1000 > config::clean_stream_load_record_interval_secs) { rocksdb::CompactRangeOptions options; - s = _db->CompactRange(options, _handles[1], nullptr, nullptr); + s = _db->CompactRange(options, _handles[0], nullptr, nullptr); if (s.ok()) { _last_compaction_time = UnixMillis(); } @@ -91,7 +88,7 @@ Status StreamLoadRecorder::put(const std::string& key, const std::string& value) } Status StreamLoadRecorder::get_batch(const std::string& start, const int batch_size, std::map* stream_load_records) { - rocksdb::ColumnFamilyHandle* handle = _handles[1]; + rocksdb::ColumnFamilyHandle* handle = _handles[0]; std::unique_ptr it(_db->NewIterator(rocksdb::ReadOptions(), handle)); if (start == "-1") { it->SeekToFirst(); diff --git a/be/src/runtime/stream_load/stream_load_recorder.h b/be/src/runtime/stream_load/stream_load_recorder.h index c1740b8f9ab901..8cd5da32e01362 100644 --- a/be/src/runtime/stream_load/stream_load_recorder.h +++ b/be/src/runtime/stream_load/stream_load_recorder.h @@ -18,6 +18,7 @@ #include #include +#include "olap/utils.h" #include "rocksdb/utilities/db_ttl.h" #pragma once @@ -43,15 +44,13 @@ class StreamLoadRecorder { rocksdb::DBWithTTL* _db; std::vector _handles; - int64_t _last_compaction_time; + AtomicInt64 _last_compaction_time; enum ColumnFamilyIndex { DEFAULT_COLUMN_FAMILY_INDEX = 0, - STREAM_LOAD_COLUMN_FAMILY_INDEX }; const std::string DEFAULT_COLUMN_FAMILY = "default"; - const std::string STREAM_LOAD_COLUMN_FAMILY = "stream_load"; }; } // namespace doris diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index dcc13ba998f5ee..1153c9d260d819 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -320,7 +320,7 @@ void BackendService::get_stream_load_record(TStreamLoadRecordResult& result, con std::map records; auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time), config::stream_load_record_batch_size, &records); if (st.ok()) { - LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " << records.size() << ", last_timestamp: " << last_stream_record_time; + LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: " << records.size() << ", last_stream_load_timestamp: " << last_stream_record_time; std::map stream_load_record_batch; std::map::iterator it = records.begin(); for (; it != records.end(); it++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java index 7989ae21762b28..b1375667e2bcce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadRecordMgr.java @@ -21,6 +21,7 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.plugin.StreamLoadAuditEvent; @@ -38,6 +39,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.text.SimpleDateFormat; import java.util.Map; public class StreamLoadRecordMgr extends MasterDaemon { @@ -66,13 +68,14 @@ protected void runAfterCatalogReady() { long lastStreamLoadTime = -1; for (Map.Entry entry : streamLoadRecordBatch.entrySet()) { TStreamLoadRecord streamLoadItem= entry.getValue(); + String startTime = TimeUtils.longToTimeString(streamLoadItem.getStartTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); + String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")); LOG.debug("receive stream load record info from backend: {}. label: {}, db: {}, tbl: {}, user: {}, user_ip: {}," + " status: {}, message: {}, error_url: {}, total_rows: {}, loaded_rows: {}, filtered_rows: {}," + " unselected_rows: {}, load_bytes: {}, start_time: {}, finish_time: {}.", backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(), streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(), streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(), streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(), - streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), streamLoadItem.getStartTime(), - streamLoadItem.getFinishTime()); + streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(), streamLoadItem.getLoadBytes(), startTime, finishTime); AuditEvent auditEvent = new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH) .setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb()).setTable(streamLoadItem.getTbl()) @@ -80,8 +83,7 @@ protected void runAfterCatalogReady() { .setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl()).setTotalRows(streamLoadItem.getTotalRows()) .setLoadedRows(streamLoadItem.getLoadedRows()).setFilteredRows(streamLoadItem.getFilteredRows()) .setUnselectedRows(streamLoadItem.getUnselectedRows()).setLoadBytes(streamLoadItem.getLoadBytes()) - .setStartTime(String.valueOf(streamLoadItem.getStartTime())).setFinishTime(String.valueOf(streamLoadItem.getFinishTime())) - .build(); + .setStartTime(startTime).setFinishTime(finishTime).build(); Catalog.getCurrentCatalog().getAuditEventProcessor().handleAuditEvent(auditEvent); if (entry.getValue().getFinishTime() > lastStreamLoadTime) { lastStreamLoadTime = entry.getValue().getFinishTime(); From 1c54c56ac9b8a18aefcaa150b7dd007c32c73d8d Mon Sep 17 00:00:00 2001 From: weizuo Date: Wed, 7 Apr 2021 21:02:09 +0800 Subject: [PATCH 6/6] transfer last stream load timestamp as int64 type from FE to BE by RPC --- be/src/http/action/stream_load.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index f1f90e9250a4de..bc73dc9d66c59c 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -233,6 +233,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { // add new line at end str = str + '\n'; HttpChannel::send_reply(req, str); + streaming_load_current_processing->increment(-1); str = ctx->prepare_stream_load_record(str); _sava_stream_load_record(ctx, str);