Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,12 @@ DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
// Whether to enable stream load record function, the default is false.
// False: disable stream load record
DEFINE_mBool(enable_stream_load_record, "false");
// Whether to enable stream load record to audit log table, the default is true.
DEFINE_mBool(enable_stream_load_record_to_audit_log_table, "false");
// the maximum bytes of a batch of stream load records to audit log table
DEFINE_mInt64(stream_load_record_batch_bytes, "104857600"); // 100MB
// the interval to send a batch of stream load records to audit log table
DEFINE_mInt64(stream_load_record_batch_interval_secs, "120"); // 2 minutes
// batch size of stream load record reported to FE
DEFINE_mInt32(stream_load_record_batch_size, "50");
// expire time of stream load record in rocksdb.
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,12 @@ DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
// Whether to enable stream load record function, the default is false.
// False: disable stream load record
DECLARE_mBool(enable_stream_load_record);
// Whether to enable stream load record to audit log table, the default is true.
DECLARE_mBool(enable_stream_load_record_to_audit_log_table);
// the maximum bytes of a batch of stream load records to audit log table
DECLARE_mInt64(stream_load_record_batch_bytes);
// the interval to send a batch of stream load records to audit log table
DECLARE_mInt64(stream_load_record_batch_interval_secs);
// batch size of stream load record reported to FE
DECLARE_mInt32(stream_load_record_batch_size);
// expire time of stream load record in rocksdb.
Expand Down
17 changes: 11 additions & 6 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ void HttpStreamAction::handle(HttpRequest* req) {
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
}
// update statistics
http_stream_requests_total->increment(1);
Expand Down Expand Up @@ -176,9 +178,12 @@ int HttpStreamAction::on_header(HttpRequest* req) {
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
if (config::enable_stream_load_record ||
config::enable_stream_load_record_to_audit_log_table) {
if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
}
return -1;
}
Expand Down
17 changes: 11 additions & 6 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ void StreamLoadAction::handle(HttpRequest* req) {
str = str + '\n';
HttpChannel::send_reply(req, str);
#ifndef BE_TEST
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) {
if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
}
#endif

Expand Down Expand Up @@ -238,9 +240,12 @@ int StreamLoadAction::on_header(HttpRequest* req) {
str = str + '\n';
HttpChannel::send_reply(req, str);
#ifndef BE_TEST
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
if (config::enable_stream_load_record ||
config::enable_stream_load_record_to_audit_log_table) {
if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
}
#endif
return -1;
Expand Down
6 changes: 6 additions & 0 deletions be/src/http/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,12 @@ Status HttpClient::execute_post_request(const std::string& payload, std::string*
return execute(response);
}

Status HttpClient::execute_put_request(const std::string& payload, std::string* response) {
set_payload(payload);
curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "PUT");
return execute(response);
}

Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) {
set_method(DELETE);
set_payload(payload);
Expand Down
2 changes: 2 additions & 0 deletions be/src/http/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class HttpClient {

Status execute_post_request(const std::string& payload, std::string* response);

Status execute_put_request(const std::string& payload, std::string* response);

Status execute_delete_request(const std::string& payload, std::string* response);

// execute a simple method, and its response is saved in response argument
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ static const std::string HTTP_GROUP_COMMIT = "group_commit";
static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster";
static const std::string HTTP_EMPTY_FIELD_AS_NULL = "empty_field_as_null";
static const std::string HTTP_COMPUTE_GROUP = "compute_group";
static const std::string HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE = "skip_record_to_audit_log_table";

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "common/config.h"
#include "common/status.h"
#include "exec/schema_scanner/schema_routine_load_job_scanner.h"
#include "io/cache/fs_file_cache_storage.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/options.h"
Expand Down Expand Up @@ -105,6 +106,7 @@ class LoadStreamMgr;
class LoadStreamMapPool;
class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class StreamLoadRecorderManager;
class SmallFileMgr;
class BackendServiceClient;
class TPaloBrokerServiceClient;
Expand Down Expand Up @@ -490,6 +492,7 @@ class ExecEnv {

std::unique_ptr<StreamLoadExecutor> _stream_load_executor;
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
StreamLoadRecorderManager* _stream_load_recorder_manager = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
Expand Down
7 changes: 7 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
#include "runtime/small_file_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/stream_load/stream_load_recorder_manager.h"
#include "runtime/thread_context.h"
#include "runtime/user_function_cache.h"
#include "runtime/workload_group/workload_group_manager.h"
Expand Down Expand Up @@ -397,6 +398,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
return st;
}

// should start after storage_engine->open()
_stream_load_recorder_manager = new StreamLoadRecorderManager();
_stream_load_recorder_manager->start();

// create internal workload group should be after storage_engin->open()
RETURN_IF_ERROR(_create_internal_workload_group());
_workload_sched_mgr = new WorkloadSchedPolicyMgr();
Expand Down Expand Up @@ -770,6 +775,7 @@ void ExecEnv::destroy() {
SAFE_STOP(_group_commit_mgr);
// _routine_load_task_executor should be stopped before _new_load_stream_mgr.
SAFE_STOP(_routine_load_task_executor);
SAFE_STOP(_stream_load_recorder_manager);
// stop workload scheduler
SAFE_STOP(_workload_sched_mgr);
// stop pipline step 2, cgroup execution
Expand Down Expand Up @@ -837,6 +843,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_file_meta_cache);
SAFE_DELETE(_group_commit_mgr);
SAFE_DELETE(_routine_load_task_executor);
SAFE_DELETE(_stream_load_recorder_manager);
// _stream_load_executor
SAFE_DELETE(_function_client_cache);
SAFE_DELETE(_streaming_client_cache);
Expand Down
35 changes: 29 additions & 6 deletions be/src/runtime/stream_load/stream_load_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <rocksdb/status.h>

#include <memory>
#include <ostream>

#include "common/config.h"
#include "common/status.h"
Expand Down Expand Up @@ -63,7 +62,9 @@ Status StreamLoadRecorder::init() {
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
// default column family is required
column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs};
// meta column family is used for bookkeeping data such as last fetch keys and should not expire
column_families.emplace_back(META_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions());
std::vector<int32_t> ttls = {config::stream_load_record_expire_time_secs, 0};

rocksdb::DBWithTTL* tmp = _db.release();
rocksdb::Status s =
Expand All @@ -78,8 +79,9 @@ Status StreamLoadRecorder::init() {
return Status::OK();
}

Status StreamLoadRecorder::put(const std::string& key, const std::string& value) {
rocksdb::ColumnFamilyHandle* handle = _handles[0];
Status StreamLoadRecorder::put(const std::string& key, const std::string& value, bool use_meta_cf) {
rocksdb::ColumnFamilyHandle* handle =
_handles[use_meta_cf ? META_COLUMN_FAMILY_INDEX : DEFAULT_COLUMN_FAMILY_INDEX];
rocksdb::WriteOptions write_options;
write_options.sync = false;
rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value));
Expand All @@ -89,20 +91,41 @@ Status StreamLoadRecorder::put(const std::string& key, const std::string& value)
s.ToString());
}

if (!use_meta_cf) {
return Status::OK();
}

if ((UnixMillis() - _last_compaction_time) / 1000 >
config::clean_stream_load_record_interval_secs) {
rocksdb::CompactRangeOptions options;
s = _db->CompactRange(options, _handles[0], nullptr, nullptr);
s = _db->CompactRange(options, _handles[DEFAULT_COLUMN_FAMILY_INDEX], nullptr, nullptr);
if (s.ok()) {
_last_compaction_time = UnixMillis();
}
}

return Status::OK();
}

Status StreamLoadRecorder::get(const std::string& key, std::string* value, bool use_meta_cf) {
rocksdb::ColumnFamilyHandle* handle =
_handles[use_meta_cf ? META_COLUMN_FAMILY_INDEX : DEFAULT_COLUMN_FAMILY_INDEX];
rocksdb::ReadOptions read_options;
rocksdb::Status s = _db->Get(read_options, handle, rocksdb::Slice(key), value);
if (s.IsNotFound()) {
return Status::NotFound("Key not found: {}", key);
}
if (!s.ok()) {
LOG(WARNING) << "rocks db get key:" << key << " failed, reason:" << s.ToString();
return Status::InternalError("Stream load record rocksdb get failed, reason: {}",
s.ToString());
}
return Status::OK();
}

Status StreamLoadRecorder::get_batch(const std::string& start, int batch_size,
std::map<std::string, std::string>* stream_load_records) {
rocksdb::ColumnFamilyHandle* handle = _handles[0];
rocksdb::ColumnFamilyHandle* handle = _handles[DEFAULT_COLUMN_FAMILY_INDEX];
std::unique_ptr<rocksdb::Iterator> it(_db->NewIterator(rocksdb::ReadOptions(), handle));
if (start == "-1") {
it->SeekToFirst();
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/stream_load/stream_load_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class StreamLoadRecorder {

Status init();

Status put(const std::string& key, const std::string& value);
Status put(const std::string& key, const std::string& value, bool use_meta_cf = false);

Status get(const std::string& key, std::string* value, bool use_meta_cf = false);

Status get_batch(const std::string& start, int batch_size,
std::map<std::string, std::string>* stream_load_records);
Expand All @@ -59,9 +61,11 @@ class StreamLoadRecorder {

enum ColumnFamilyIndex {
DEFAULT_COLUMN_FAMILY_INDEX = 0,
META_COLUMN_FAMILY_INDEX = 1,
};

const std::string DEFAULT_COLUMN_FAMILY = "default";
const std::string META_COLUMN_FAMILY = "meta";
};

} // namespace doris
Loading
Loading