From 0b233c8ee8111282b879cad075e0229dc4d96473 Mon Sep 17 00:00:00 2001 From: laihui Date: Wed, 12 Nov 2025 11:23:50 +0800 Subject: [PATCH 1/4] enable stream load record to audit log system table --- be/src/common/config.cpp | 6 + be/src/common/config.h | 6 + be/src/http/action/http_stream.cpp | 17 +- be/src/http/action/stream_load.cpp | 17 +- be/src/http/http_client.cpp | 6 + be/src/http/http_client.h | 2 + be/src/http/http_common.h | 1 + be/src/runtime/exec_env.h | 3 + be/src/runtime/exec_env_init.cpp | 7 + .../stream_load/stream_load_recorder.cpp | 15 + .../stream_load/stream_load_recorder.h | 2 + .../stream_load_recorder_manager.cpp | 324 ++++++++++++++++++ .../stream_load_recorder_manager.h | 98 ++++++ .../apache/doris/catalog/InternalSchema.java | 3 +- .../doris/plugin/audit/AuditStreamLoader.java | 1 + .../test_stream_load_audit_log.groovy | 77 +++++ .../manager/test_manager_interface_1.groovy | 2 +- 17 files changed, 573 insertions(+), 14 deletions(-) create mode 100644 be/src/runtime/stream_load/stream_load_recorder_manager.cpp create mode 100644 be/src/runtime/stream_load/stream_load_recorder_manager.h create mode 100644 regression-test/suites/load_p0/stream_load/test_stream_load_audit_log.groovy diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c95ee3968a2c47..2d3e284cecc1dd 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -627,6 +627,12 @@ DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60"); // Whether to enable stream load record function, the default is false. // False: disable stream load record DEFINE_mBool(enable_stream_load_record, "false"); +// Whether to enable stream load record to audit log table, the default is true. +DEFINE_mBool(enable_stream_load_record_to_audit_log_table, "true"); +// the maximum bytes of a batch of stream load records to audit log table +DEFINE_mInt64(stream_load_record_batch_bytes, "104857600"); // 100MB +// the interval to send a batch of stream load records to audit log table +DEFINE_mInt64(stream_load_record_batch_interval_secs, "120"); // 2 minutes // batch size of stream load record reported to FE DEFINE_mInt32(stream_load_record_batch_size, "50"); // expire time of stream load record in rocksdb. diff --git a/be/src/common/config.h b/be/src/common/config.h index 8549c2d7dfe262..24fab865d1df1e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -648,6 +648,12 @@ DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec); // Whether to enable stream load record function, the default is false. // False: disable stream load record DECLARE_mBool(enable_stream_load_record); +// Whether to enable stream load record to audit log table, the default is true. +DECLARE_mBool(enable_stream_load_record_to_audit_log_table); +// the maximum bytes of a batch of stream load records to audit log table +DECLARE_mInt64(stream_load_record_batch_bytes); +// the interval to send a batch of stream load records to audit log table +DECLARE_mInt64(stream_load_record_batch_interval_secs); // batch size of stream load record reported to FE DECLARE_mInt32(stream_load_record_batch_size); // expire time of stream load record in rocksdb. diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 4e41362f8a22c7..c47a22164ecd3d 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -113,9 +113,11 @@ void HttpStreamAction::handle(HttpRequest* req) { // add new line at end str = str + '\n'; HttpChannel::send_reply(req, str); - if (config::enable_stream_load_record) { - str = ctx->prepare_stream_load_record(str); - _save_stream_load_record(ctx, str); + if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) { + if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } } // update statistics http_stream_requests_total->increment(1); @@ -176,9 +178,12 @@ int HttpStreamAction::on_header(HttpRequest* req) { // add new line at end str = str + '\n'; HttpChannel::send_reply(req, str); - if (config::enable_stream_load_record) { - str = ctx->prepare_stream_load_record(str); - _save_stream_load_record(ctx, str); + if (config::enable_stream_load_record || + config::enable_stream_load_record_to_audit_log_table) { + if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } } return -1; } diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 59641d4a934332..ee660d9a1bf617 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -131,9 +131,11 @@ void StreamLoadAction::handle(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); #ifndef BE_TEST - if (config::enable_stream_load_record) { - str = ctx->prepare_stream_load_record(str); - _save_stream_load_record(ctx, str); + if (config::enable_stream_load_record || config::enable_stream_load_record_to_audit_log_table) { + if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } } #endif @@ -238,9 +240,12 @@ int StreamLoadAction::on_header(HttpRequest* req) { str = str + '\n'; HttpChannel::send_reply(req, str); #ifndef BE_TEST - if (config::enable_stream_load_record) { - str = ctx->prepare_stream_load_record(str); - _save_stream_load_record(ctx, str); + if (config::enable_stream_load_record || + config::enable_stream_load_record_to_audit_log_table) { + if (req->header(HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE).empty()) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } } #endif return -1; diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp index b11df1e833b821..cb52e74206e1c5 100644 --- a/be/src/http/http_client.cpp +++ b/be/src/http/http_client.cpp @@ -409,6 +409,12 @@ Status HttpClient::execute_post_request(const std::string& payload, std::string* return execute(response); } +Status HttpClient::execute_put_request(const std::string& payload, std::string* response) { + set_payload(payload); + curl_easy_setopt(_curl, CURLOPT_CUSTOMREQUEST, "PUT"); + return execute(response); +} + Status HttpClient::execute_delete_request(const std::string& payload, std::string* response) { set_method(DELETE); set_payload(payload); diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h index 7447ff9a7515cb..d7f8142cf4b8dd 100644 --- a/be/src/http/http_client.h +++ b/be/src/http/http_client.h @@ -158,6 +158,8 @@ class HttpClient { Status execute_post_request(const std::string& payload, std::string* response); + Status execute_put_request(const std::string& payload, std::string* response); + Status execute_delete_request(const std::string& payload, std::string* response); // execute a simple method, and its response is saved in response argument diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h index 37037757f2158b..f250e22a278e55 100644 --- a/be/src/http/http_common.h +++ b/be/src/http/http_common.h @@ -73,5 +73,6 @@ static const std::string HTTP_GROUP_COMMIT = "group_commit"; static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster"; static const std::string HTTP_EMPTY_FIELD_AS_NULL = "empty_field_as_null"; static const std::string HTTP_COMPUTE_GROUP = "compute_group"; +static const std::string HTTP_SKIP_RECORD_TO_AUDIT_LOG_TABLE = "skip_record_to_audit_log_table"; } // namespace doris diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 1f68b435a5b70d..aca469c2c8f109 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -30,6 +30,7 @@ #include "common/config.h" #include "common/status.h" +#include "exec/schema_scanner/schema_routine_load_job_scanner.h" #include "io/cache/fs_file_cache_storage.h" #include "olap/memtable_memory_limiter.h" #include "olap/options.h" @@ -105,6 +106,7 @@ class LoadStreamMgr; class LoadStreamMapPool; class StreamLoadExecutor; class RoutineLoadTaskExecutor; +class StreamLoadRecorderManager; class SmallFileMgr; class BackendServiceClient; class TPaloBrokerServiceClient; @@ -484,6 +486,7 @@ class ExecEnv { std::unique_ptr _stream_load_executor; RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; + StreamLoadRecorderManager* _stream_load_recorder_manager = nullptr; SmallFileMgr* _small_file_mgr = nullptr; HeartbeatFlags* _heartbeat_flags = nullptr; vectorized::ScannerScheduler* _scanner_scheduler = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index e77b67f766efd1..2d10f356309bb5 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -93,6 +93,7 @@ #include "runtime/small_file_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" +#include "runtime/stream_load/stream_load_recorder_manager.h" #include "runtime/thread_context.h" #include "runtime/user_function_cache.h" #include "runtime/workload_group/workload_group_manager.h" @@ -392,6 +393,10 @@ Status ExecEnv::_init(const std::vector& store_paths, return st; } + // should start after storage_engine->open() + _stream_load_recorder_manager = new StreamLoadRecorderManager(); + _stream_load_recorder_manager->start(); + // create internal workload group should be after storage_engin->open() RETURN_IF_ERROR(_create_internal_workload_group()); _workload_sched_mgr = new WorkloadSchedPolicyMgr(); @@ -765,6 +770,7 @@ void ExecEnv::destroy() { SAFE_STOP(_group_commit_mgr); // _routine_load_task_executor should be stopped before _new_load_stream_mgr. SAFE_STOP(_routine_load_task_executor); + SAFE_STOP(_stream_load_recorder_manager); // stop workload scheduler SAFE_STOP(_workload_sched_mgr); // stop pipline step 2, cgroup execution @@ -831,6 +837,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_file_meta_cache); SAFE_DELETE(_group_commit_mgr); SAFE_DELETE(_routine_load_task_executor); + SAFE_DELETE(_stream_load_recorder_manager); // _stream_load_executor SAFE_DELETE(_function_client_cache); SAFE_DELETE(_streaming_client_cache); diff --git a/be/src/runtime/stream_load/stream_load_recorder.cpp b/be/src/runtime/stream_load/stream_load_recorder.cpp index 65f8acb9a0fdee..03aff056900a1d 100644 --- a/be/src/runtime/stream_load/stream_load_recorder.cpp +++ b/be/src/runtime/stream_load/stream_load_recorder.cpp @@ -100,6 +100,21 @@ Status StreamLoadRecorder::put(const std::string& key, const std::string& value) return Status::OK(); } +Status StreamLoadRecorder::get(const std::string& key, std::string* value) { + rocksdb::ColumnFamilyHandle* handle = _handles[0]; + rocksdb::ReadOptions read_options; + rocksdb::Status s = _db->Get(read_options, handle, rocksdb::Slice(key), value); + if (s.IsNotFound()) { + return Status::NotFound("Key not found: {}", key); + } + if (!s.ok()) { + LOG(WARNING) << "rocks db get key:" << key << " failed, reason:" << s.ToString(); + return Status::InternalError("Stream load record rocksdb get failed, reason: {}", + s.ToString()); + } + return Status::OK(); +} + Status StreamLoadRecorder::get_batch(const std::string& start, int batch_size, std::map* stream_load_records) { rocksdb::ColumnFamilyHandle* handle = _handles[0]; diff --git a/be/src/runtime/stream_load/stream_load_recorder.h b/be/src/runtime/stream_load/stream_load_recorder.h index cc1ce22b353319..702382bb93b9cb 100644 --- a/be/src/runtime/stream_load/stream_load_recorder.h +++ b/be/src/runtime/stream_load/stream_load_recorder.h @@ -47,6 +47,8 @@ class StreamLoadRecorder { Status put(const std::string& key, const std::string& value); + Status get(const std::string& key, std::string* value); + Status get_batch(const std::string& start, int batch_size, std::map* stream_load_records); diff --git a/be/src/runtime/stream_load/stream_load_recorder_manager.cpp b/be/src/runtime/stream_load/stream_load_recorder_manager.cpp new file mode 100644 index 00000000000000..2457efae2aeaa6 --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_recorder_manager.cpp @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/stream_load/stream_load_recorder_manager.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include "common/config.h" +#include "common/status.h" +#include "http/http_client.h" +#include "olap/storage_engine.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/stream_load/stream_load_recorder.h" +#include "runtime/thread_context.h" +#include "util/time.h" + +namespace doris { + +static constexpr int64_t DEFAULT_STREAM_LOAD_TIMEOUT_SEC = 600; // 10 minutes +static constexpr const char* DEFAULT_INTERNAL_DB_NAME = "__internal_schema"; +static constexpr const char* STREAM_LOAD_RECORD_TABLE = "audit_log"; +static constexpr const char* COLUMN_SEPARATOR = "\t"; +static constexpr const char* LINE_DELIMITER = "\n"; +// Use '#' prefix which comes before digits in ASCII (# = 35, 0 = 48) +static constexpr const char* LAST_FETCH_KEY_STORAGE_KEY = + "#stream_load_recorder_manager_last_fetch_key"; + +StreamLoadRecorderManager::StreamLoadRecorderManager() + : _stop(false), _last_load_time(UnixMillis()), _record_num(0), _last_fetch_key("-1") {} + +StreamLoadRecorderManager::~StreamLoadRecorderManager() {} + +void StreamLoadRecorderManager::start() { + _recorder = ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder(); + _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, + "StreamLoadRecorderManager"); + _load_last_fetch_key(); + _worker_thread = std::thread(&StreamLoadRecorderManager::_worker_thread_func, this); +} + +void StreamLoadRecorderManager::_load_last_fetch_key() { + if (!_recorder) { + LOG(WARNING) << "StreamLoadRecorder is not initialized"; + return; + } + + std::string value; + Status st = _recorder->get(LAST_FETCH_KEY_STORAGE_KEY, &value); + if (st.ok() && !value.empty()) { + _last_fetch_key = value; + LOG(INFO) << "Loaded stream load recorder manager last fetch key from RocksDB: " + << _last_fetch_key; + } else { + LOG(INFO) << "No stream load recorder manager last fetch key found in RocksDB, starting " + "from beginning"; + _last_fetch_key = "-1"; + } +} + +void StreamLoadRecorderManager::stop() { + _stop = true; + if (_worker_thread.joinable()) { + _worker_thread.join(); + } +} + +void StreamLoadRecorderManager::_worker_thread_func() { + SCOPED_ATTACH_TASK(_mem_tracker); + while (!_stop) { + _fetch_and_buffer_records(); + _load_if_necessary(); + std::this_thread::sleep_for(std::chrono::seconds(1)); + } +} + +void StreamLoadRecorderManager::_fetch_and_buffer_records() { + if (!_recorder) { + LOG(WARNING) << "StreamLoadRecorder is not initialized"; + return; + } + + std::map records; + Status st = + _recorder->get_batch(_last_fetch_key, config::stream_load_record_batch_size, &records); + if (!st.ok()) { + LOG(WARNING) << "Failed to fetch stream load records from RocksDB: " << st; + return; + } + if (records.empty()) { + return; + } + + for (const auto& [key, value] : records) { + if (key == LAST_FETCH_KEY_STORAGE_KEY) { + _last_fetch_key = key; + continue; + } + std::string record_line = _parse_and_format_record(value); + if (!record_line.empty()) { + _buffer.append(record_line); + _buffer.append(LINE_DELIMITER, strlen(LINE_DELIMITER)); + _record_num++; + } + _last_fetch_key = key; + } + + VLOG(1) << "Fetched " << records.size() << " stream load records, total " << _record_num + << " records buffered"; +} + +void StreamLoadRecorderManager::_save_last_fetch_key() { + if (!_recorder) { + LOG(WARNING) << "StreamLoadRecorder is not initialized"; + return; + } + + if (_last_fetch_key == "-1" || _last_fetch_key == LAST_FETCH_KEY_STORAGE_KEY) { + return; + } + + Status st = _recorder->put(LAST_FETCH_KEY_STORAGE_KEY, _last_fetch_key); + if (!st.ok()) { + LOG(WARNING) << "Failed to save stream load recorder manager last fetch key to RocksDB: " + << st; + } +} + +std::string StreamLoadRecorderManager::_parse_and_format_record(const std::string& json_record) { + rapidjson::Document doc; + if (doc.Parse(json_record.data(), json_record.length()).HasParseError()) { + LOG(WARNING) << "Failed to parse JSON record: " << json_record; + return ""; + } + + auto get_string = [&doc](const char* key) -> std::string { + if (doc.HasMember(key) && doc[key].IsString()) { + return doc[key].GetString(); + } + return ""; + }; + + auto get_int64 = [&doc](const char* key) -> int64_t { + if (doc.HasMember(key) && doc[key].IsInt64()) { + return doc[key].GetInt64(); + } + return 0; + }; + + auto timestamp_to_datetime = [](int64_t ts_ms) -> std::string { + if (ts_ms <= 0) return ""; + time_t ts_sec = ts_ms / 1000; + int64_t ms_part = ts_ms % 1000; + struct tm tm_buf; + localtime_r(&ts_sec, &tm_buf); + char buf[64]; + snprintf(buf, sizeof(buf), "%04d-%02d-%02d %02d:%02d:%02d.%03ld", tm_buf.tm_year + 1900, + tm_buf.tm_mon + 1, tm_buf.tm_mday, tm_buf.tm_hour, tm_buf.tm_min, tm_buf.tm_sec, + ms_part); + return std::string(buf); + }; + + std::string label = get_string("Label"); + std::string db = get_string("Db"); + std::string table = get_string("Table"); + std::string user = get_string("User"); + std::string client_ip = get_string("ClientIp"); + std::string status = get_string("Status"); + std::string message = get_string("Message"); + std::string error_url = get_string("ErrorURL"); + int64_t total_rows = get_int64("NumberTotalRows"); + int64_t loaded_rows = get_int64("NumberLoadedRows"); + int64_t filtered_rows = get_int64("NumberFilteredRows"); + int64_t unselected_rows = get_int64("NumberUnselectedRows"); + int64_t load_bytes = get_int64("LoadBytes"); + int64_t start_time = get_int64("StartTime"); + int64_t finish_time = get_int64("FinishTime"); + std::string comment = get_string("Comment"); + int64_t query_time = (finish_time > start_time) ? (finish_time - start_time) : 0; + std::string stmt = fmt::format( + "STREAM LOAD: table={}, label={}, status={}, " + "total_rows={}, loaded_rows={}, filtered_rows={}, " + "unselected_rows={}, load_bytes={}, url={}", + table, label, status, total_rows, loaded_rows, filtered_rows, unselected_rows, + load_bytes, error_url); + if (!comment.empty()) { + stmt += fmt::format(", comment={}", comment); + } + + // Use column mapping in stream load to map these to AUDIT_SCHEMA columns. + std::stringstream ss; + ss << label + << COLUMN_SEPARATOR // query_id, we use label replace it for it is hard to get query id and label query is convenient for users + << timestamp_to_datetime(finish_time) + << COLUMN_SEPARATOR // time (convert to datetime string) + << client_ip << COLUMN_SEPARATOR // client_ip + << user << COLUMN_SEPARATOR // user + << db << COLUMN_SEPARATOR // db + << status << COLUMN_SEPARATOR // state + << message << COLUMN_SEPARATOR // error_message + << query_time << COLUMN_SEPARATOR // query_time + << load_bytes << COLUMN_SEPARATOR // scan_bytes + << total_rows << COLUMN_SEPARATOR // scan_rows + << loaded_rows << COLUMN_SEPARATOR // return_rows + << stmt; // stmt + + return ss.str(); +} + +void StreamLoadRecorderManager::_load_if_necessary() { + int64_t current_time = UnixMillis(); + bool should_load = _buffer.size() >= config::stream_load_record_batch_bytes || + (current_time - _last_load_time) >= + config::stream_load_record_batch_interval_secs * 1000; + if (!should_load || _buffer.size() == 0) { + return; + } + + Status st = _send_stream_load(_buffer.ToString()); + if (!st.ok()) { + LOG(WARNING) << "Failed to load stream load records to audit log table: " << st + << ", discard current batch"; + } + + _save_last_fetch_key(); + + _reset_batch(current_time); +} + +Status StreamLoadRecorderManager::_send_stream_load(const std::string& data) { + std::string url = _generate_url(); + if (url.empty()) { + return Status::InternalError("FE address is not available yet"); + } + std::string label = _generate_label(); + + HttpClient client; + Status st = client.init(url); + if (!st.ok()) { + return Status::InternalError("Failed to init http client: {}", st.to_string()); + } + client.set_authorization("Basic YWRtaW46"); + client.set_header("Expect", "100-continue"); + client.set_content_type("text/plain; charset=UTF-8"); + client.set_header("label", label); + client.set_header("timeout", std::to_string(DEFAULT_STREAM_LOAD_TIMEOUT_SEC)); + client.set_header("max_filter_ratio", "1.0"); + client.set_header("column_separator", COLUMN_SEPARATOR); + client.set_header("line_delimiter", LINE_DELIMITER); + client.set_header("columns", + "query_id,time,client_ip,user,db,state,error_message," + "query_time,scan_bytes,scan_rows,return_rows,stmt"); + client.set_header("skip_record_to_audit_log_table", + "true"); // Prevent infinite loop: don't record audit loads + client.set_timeout_ms(DEFAULT_STREAM_LOAD_TIMEOUT_SEC * 1000); + + std::string response; + st = client.execute_put_request(data, &response); + if (!st.ok()) { + return Status::InternalError("Failed to execute stream load request: {}", st.to_string()); + } + long http_status = client.get_http_status(); + if (http_status != 200) { + return Status::InternalError("Stream load failed with HTTP status {}: {}", http_status, + response); + } + rapidjson::Document doc; + if (!doc.Parse(response.data(), response.length()).HasParseError()) { + if (doc.HasMember("Status") && doc["Status"].IsString()) { + std::string status = doc["Status"].GetString(); + if (status != "Success") { + return Status::InternalError("Stream load status is not Success: {}", response); + } + } + } + + return Status::OK(); +} + +std::string StreamLoadRecorderManager::_generate_label() { + auto now = std::chrono::system_clock::now(); + auto now_time_t = std::chrono::system_clock::to_time_t(now); + auto now_ms = + std::chrono::duration_cast(now.time_since_epoch()) % 1000; + std::tm tm_buf; + localtime_r(&now_time_t, &tm_buf); + return fmt::format("audit_log_{:04d}{:02d}{:02d}_{:02d}{:02d}{:02d}_{:03d}", + tm_buf.tm_year + 1900, tm_buf.tm_mon + 1, tm_buf.tm_mday, tm_buf.tm_hour, + tm_buf.tm_min, tm_buf.tm_sec, static_cast(now_ms.count())); +} + +std::string StreamLoadRecorderManager::_generate_url() { + return fmt::format("http://127.0.0.1:{}/api/{}/{}/_stream_load", config::webserver_port, + DEFAULT_INTERNAL_DB_NAME, STREAM_LOAD_RECORD_TABLE); +} + +void StreamLoadRecorderManager::_reset_batch(int64_t current_time) { + _buffer.clear(); + _last_load_time = current_time; + _record_num = 0; +} + +} // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_recorder_manager.h b/be/src/runtime/stream_load/stream_load_recorder_manager.h new file mode 100644 index 00000000000000..b7908b14b86e58 --- /dev/null +++ b/be/src/runtime/stream_load/stream_load_recorder_manager.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include "util/faststring.h" + +namespace doris { + +class ExecEnv; +class MemTrackerLimiter; +class StreamLoadRecorder; +class Status; + +/** + * StreamLoadRecorderManager is responsible for periodically fetching stream load records + * from RocksDB (via StreamLoadRecorder) and sending them to the FE's audit_log table + * via HTTP stream load. Records are formatted to match AUDIT_SCHEMA. + * + * This is similar to the FE's AuditLoader logic, where audit events are buffered and sent + * to the internal audit table when certain thresholds are reached (size or time-based). + * + * The manager runs a background worker thread that: + * 1. Periodically fetches stream load records from RocksDB + * 2. Parses JSON and formats as TSV matching AUDIT_SCHEMA + * 3. Buffers them in memory + * 4. Sends them via HTTP stream load when: + * - Buffer size exceeds threshold (default: 50MB) + * - Time since last load exceeds threshold (default: 60s) + * - Force flush is requested (e.g., during shutdown) + */ +class StreamLoadRecorderManager { +public: + StreamLoadRecorderManager(); + + ~StreamLoadRecorderManager(); + + void start(); + + void stop(); + +private: + void _load_last_fetch_key(); + + void _worker_thread_func(); + + void _fetch_and_buffer_records(); + + void _save_last_fetch_key(); + + std::string _parse_and_format_record(const std::string& json_record); + + void _load_if_necessary(); + + Status _send_stream_load(const std::string& data); + + std::string _generate_label(); + + std::string _generate_url(); + + void _reset_batch(int64_t current_time); + +private: + std::shared_ptr _recorder; + std::shared_ptr _mem_tracker; + + std::thread _worker_thread; + std::atomic _stop; + + faststring _buffer; + + int64_t _last_load_time; + int64_t _record_num; + + std::string _last_fetch_key; +}; + +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java index 2323a2d06bed69..ac8f58d1fa7db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.ColumnDef; import org.apache.doris.analysis.ColumnNullableType; import org.apache.doris.analysis.TypeDef; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.plugin.audit.AuditLoader; import org.apache.doris.statistics.StatisticConstants; @@ -126,7 +127,7 @@ public class InternalSchema { AUDIT_SCHEMA = new ArrayList<>(); // uuid and time AUDIT_SCHEMA.add(new ColumnDef("query_id", - TypeDef.createVarchar(48), ColumnNullableType.NULLABLE)); + TypeDef.createVarchar(Config.label_regex_length), ColumnNullableType.NULLABLE)); AUDIT_SCHEMA.add(new ColumnDef("time", TypeDef.createDatetimeV2(3), ColumnNullableType.NULLABLE)); // cs info diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java index 3ed9ba14508eb4..fb9ffde02dc3c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java @@ -75,6 +75,7 @@ private HttpURLConnection getConnection(String urlStr, String label, String clus conn.addRequestProperty("redirect-policy", "random-be"); conn.addRequestProperty("column_separator", AuditLoader.AUDIT_TABLE_COL_SEPARATOR_STR); conn.addRequestProperty("line_delimiter", AuditLoader.AUDIT_TABLE_LINE_DELIMITER_STR); + conn.addRequestProperty("skip_record_to_audit_log_table", "true"); conn.setDoOutput(true); conn.setDoInput(true); return conn; diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_audit_log.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_audit_log.groovy new file mode 100644 index 00000000000000..6bdb980d09505b --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_audit_log.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_audit_log", "p0") { + def tableName = "test_stream_load_record" + def label = UUID.randomUUID().toString().replaceAll("-", "") + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL, + `v4` smallint(6) REPLACE_IF_NOT_NULL NULL, + `v5` int(11) REPLACE_IF_NOT_NULL NULL, + `v6` bigint(20) REPLACE_IF_NOT_NULL NULL, + `v7` largeint(40) REPLACE_IF_NOT_NULL NULL, + `v8` datetime REPLACE_IF_NOT_NULL NULL, + `v9` date REPLACE_IF_NOT_NULL NULL, + `v10` char(10) REPLACE_IF_NOT_NULL NULL, + `v11` varchar(6) REPLACE_IF_NOT_NULL NULL, + `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")), + PARTITION partition_b VALUES [("100000"), ("1000000000")), + PARTITION partition_c VALUES [("1000000000"), ("10000000000")), + PARTITION partition_d VALUES [("10000000000"), (MAXVALUE))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', '\t' + set 'label', "${label}" + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' + + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s + } + + def count = 0 + while (true) { + sleep(1000) + def res = sql"select * from __internal_schema.audit_log where query_id = '${label}'" + log.info("Audit log result: ${res.toString()}") + if (res.size() > 0) { + break + } + count++ + if (count > 150) { + throw new RuntimeException("Audit log not found after 150 seconds") + } + } +} \ No newline at end of file diff --git a/regression-test/suites/manager/test_manager_interface_1.groovy b/regression-test/suites/manager/test_manager_interface_1.groovy index bdb01c0cce87af..0f4cc0bd4cfe77 100644 --- a/regression-test/suites/manager/test_manager_interface_1.groovy +++ b/regression-test/suites/manager/test_manager_interface_1.groovy @@ -547,7 +547,7 @@ suite('test_manager_interface_1',"p0") { assertTrue(result[0][0] == "audit_log") assertTrue(result[0][1].contains("CREATE TABLE `audit_log`")) - assertTrue(result[0][1].contains("`query_id` varchar(48) NULL,")) + assertTrue(result[0][1].contains("`query_id` varchar(128) NULL,")) assertTrue(result[0][1].contains("`time` datetime(3) NULL,")) assertTrue(result[0][1].contains("`client_ip` varchar(128) NULL,")) assertTrue(result[0][1].contains("`user` varchar(128) NULL,")) From b36abb882fdf3ca6f63e526a3f4ed37a89a13a38 Mon Sep 17 00:00:00 2001 From: laihui Date: Tue, 18 Nov 2025 18:06:31 +0800 Subject: [PATCH 2/4] update --- .../stream_load/stream_load_recorder.cpp | 24 ++++++++++++------- .../stream_load/stream_load_recorder.h | 6 +++-- .../stream_load_recorder_manager.cpp | 15 ++++++------ 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/be/src/runtime/stream_load/stream_load_recorder.cpp b/be/src/runtime/stream_load/stream_load_recorder.cpp index 03aff056900a1d..c9da300ca732cd 100644 --- a/be/src/runtime/stream_load/stream_load_recorder.cpp +++ b/be/src/runtime/stream_load/stream_load_recorder.cpp @@ -22,7 +22,6 @@ #include #include -#include #include "common/config.h" #include "common/status.h" @@ -63,7 +62,9 @@ Status StreamLoadRecorder::init() { std::vector column_families; // default column family is required column_families.emplace_back(DEFAULT_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions()); - std::vector ttls = {config::stream_load_record_expire_time_secs}; + // meta column family is used for bookkeeping data such as last fetch keys and should not expire + column_families.emplace_back(META_COLUMN_FAMILY, rocksdb::ColumnFamilyOptions()); + std::vector ttls = {config::stream_load_record_expire_time_secs, 0}; rocksdb::DBWithTTL* tmp = _db.release(); rocksdb::Status s = @@ -78,8 +79,9 @@ Status StreamLoadRecorder::init() { return Status::OK(); } -Status StreamLoadRecorder::put(const std::string& key, const std::string& value) { - rocksdb::ColumnFamilyHandle* handle = _handles[0]; +Status StreamLoadRecorder::put(const std::string& key, const std::string& value, bool use_meta_cf) { + rocksdb::ColumnFamilyHandle* handle = + _handles[use_meta_cf ? META_COLUMN_FAMILY_INDEX : DEFAULT_COLUMN_FAMILY_INDEX]; rocksdb::WriteOptions write_options; write_options.sync = false; rocksdb::Status s = _db->Put(write_options, handle, rocksdb::Slice(key), rocksdb::Slice(value)); @@ -89,19 +91,25 @@ Status StreamLoadRecorder::put(const std::string& key, const std::string& value) s.ToString()); } + if (!use_meta_cf) { + return Status::OK(); + } + if ((UnixMillis() - _last_compaction_time) / 1000 > config::clean_stream_load_record_interval_secs) { rocksdb::CompactRangeOptions options; - s = _db->CompactRange(options, _handles[0], nullptr, nullptr); + s = _db->CompactRange(options, _handles[DEFAULT_COLUMN_FAMILY_INDEX], nullptr, nullptr); if (s.ok()) { _last_compaction_time = UnixMillis(); } } + return Status::OK(); } -Status StreamLoadRecorder::get(const std::string& key, std::string* value) { - rocksdb::ColumnFamilyHandle* handle = _handles[0]; +Status StreamLoadRecorder::get(const std::string& key, std::string* value, bool use_meta_cf) { + rocksdb::ColumnFamilyHandle* handle = + _handles[use_meta_cf ? META_COLUMN_FAMILY_INDEX : DEFAULT_COLUMN_FAMILY_INDEX]; rocksdb::ReadOptions read_options; rocksdb::Status s = _db->Get(read_options, handle, rocksdb::Slice(key), value); if (s.IsNotFound()) { @@ -117,7 +125,7 @@ Status StreamLoadRecorder::get(const std::string& key, std::string* value) { Status StreamLoadRecorder::get_batch(const std::string& start, int batch_size, std::map* stream_load_records) { - rocksdb::ColumnFamilyHandle* handle = _handles[0]; + rocksdb::ColumnFamilyHandle* handle = _handles[DEFAULT_COLUMN_FAMILY_INDEX]; std::unique_ptr it(_db->NewIterator(rocksdb::ReadOptions(), handle)); if (start == "-1") { it->SeekToFirst(); diff --git a/be/src/runtime/stream_load/stream_load_recorder.h b/be/src/runtime/stream_load/stream_load_recorder.h index 702382bb93b9cb..e0e452ad962022 100644 --- a/be/src/runtime/stream_load/stream_load_recorder.h +++ b/be/src/runtime/stream_load/stream_load_recorder.h @@ -45,9 +45,9 @@ class StreamLoadRecorder { Status init(); - Status put(const std::string& key, const std::string& value); + Status put(const std::string& key, const std::string& value, bool use_meta_cf = false); - Status get(const std::string& key, std::string* value); + Status get(const std::string& key, std::string* value, bool use_meta_cf = false); Status get_batch(const std::string& start, int batch_size, std::map* stream_load_records); @@ -61,9 +61,11 @@ class StreamLoadRecorder { enum ColumnFamilyIndex { DEFAULT_COLUMN_FAMILY_INDEX = 0, + META_COLUMN_FAMILY_INDEX = 1, }; const std::string DEFAULT_COLUMN_FAMILY = "default"; + const std::string META_COLUMN_FAMILY = "meta"; }; } // namespace doris diff --git a/be/src/runtime/stream_load/stream_load_recorder_manager.cpp b/be/src/runtime/stream_load/stream_load_recorder_manager.cpp index 2457efae2aeaa6..28597b8c6d6ca9 100644 --- a/be/src/runtime/stream_load/stream_load_recorder_manager.cpp +++ b/be/src/runtime/stream_load/stream_load_recorder_manager.cpp @@ -43,14 +43,13 @@ static constexpr const char* DEFAULT_INTERNAL_DB_NAME = "__internal_schema"; static constexpr const char* STREAM_LOAD_RECORD_TABLE = "audit_log"; static constexpr const char* COLUMN_SEPARATOR = "\t"; static constexpr const char* LINE_DELIMITER = "\n"; -// Use '#' prefix which comes before digits in ASCII (# = 35, 0 = 48) static constexpr const char* LAST_FETCH_KEY_STORAGE_KEY = - "#stream_load_recorder_manager_last_fetch_key"; + "stream_load_recorder_manager_last_fetch_key"; StreamLoadRecorderManager::StreamLoadRecorderManager() : _stop(false), _last_load_time(UnixMillis()), _record_num(0), _last_fetch_key("-1") {} -StreamLoadRecorderManager::~StreamLoadRecorderManager() {} +StreamLoadRecorderManager::~StreamLoadRecorderManager() = default; void StreamLoadRecorderManager::start() { _recorder = ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder(); @@ -67,7 +66,7 @@ void StreamLoadRecorderManager::_load_last_fetch_key() { } std::string value; - Status st = _recorder->get(LAST_FETCH_KEY_STORAGE_KEY, &value); + Status st = _recorder->get(LAST_FETCH_KEY_STORAGE_KEY, &value, true); if (st.ok() && !value.empty()) { _last_fetch_key = value; LOG(INFO) << "Loaded stream load recorder manager last fetch key from RocksDB: " @@ -140,7 +139,7 @@ void StreamLoadRecorderManager::_save_last_fetch_key() { return; } - Status st = _recorder->put(LAST_FETCH_KEY_STORAGE_KEY, _last_fetch_key); + Status st = _recorder->put(LAST_FETCH_KEY_STORAGE_KEY, _last_fetch_key, true); if (!st.ok()) { LOG(WARNING) << "Failed to save stream load recorder manager last fetch key to RocksDB: " << st; @@ -169,7 +168,9 @@ std::string StreamLoadRecorderManager::_parse_and_format_record(const std::strin }; auto timestamp_to_datetime = [](int64_t ts_ms) -> std::string { - if (ts_ms <= 0) return ""; + if (ts_ms <= 0) { + return ""; + } time_t ts_sec = ts_ms / 1000; int64_t ms_part = ts_ms % 1000; struct tm tm_buf; @@ -178,7 +179,7 @@ std::string StreamLoadRecorderManager::_parse_and_format_record(const std::strin snprintf(buf, sizeof(buf), "%04d-%02d-%02d %02d:%02d:%02d.%03ld", tm_buf.tm_year + 1900, tm_buf.tm_mon + 1, tm_buf.tm_mday, tm_buf.tm_hour, tm_buf.tm_min, tm_buf.tm_sec, ms_part); - return std::string(buf); + return {buf}; }; std::string label = get_string("Label"); From 79bece94df5b845a69856c947cde5b3f1c205230 Mon Sep 17 00:00:00 2001 From: laihui Date: Tue, 18 Nov 2025 23:18:41 +0800 Subject: [PATCH 3/4] update --- .../src/main/java/org/apache/doris/catalog/InternalSchema.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java index 1bd45071c0da14..955e021fa444ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java @@ -19,7 +19,6 @@ import org.apache.doris.analysis.ColumnDef; import org.apache.doris.analysis.ColumnNullableType; -import org.apache.doris.analysis.TypeDef; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.plugin.audit.AuditLoader; From b80838766b10be1b9a52fa9b6812a32b7d26c832 Mon Sep 17 00:00:00 2001 From: laihui Date: Wed, 19 Nov 2025 14:54:38 +0800 Subject: [PATCH 4/4] update --- be/src/common/config.cpp | 2 +- .../test_stream_load_audit_log.groovy | 104 +++++++++--------- 2 files changed, 56 insertions(+), 50 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c4ae8099e81930..87c427141b1070 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -628,7 +628,7 @@ DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60"); // False: disable stream load record DEFINE_mBool(enable_stream_load_record, "false"); // Whether to enable stream load record to audit log table, the default is true. -DEFINE_mBool(enable_stream_load_record_to_audit_log_table, "true"); +DEFINE_mBool(enable_stream_load_record_to_audit_log_table, "false"); // the maximum bytes of a batch of stream load records to audit log table DEFINE_mInt64(stream_load_record_batch_bytes, "104857600"); // 100MB // the interval to send a batch of stream load records to audit log table diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_audit_log.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_audit_log.groovy index 6bdb980d09505b..9879dc6c88f62e 100644 --- a/regression-test/suites/load_p0/stream_load/test_stream_load_audit_log.groovy +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_audit_log.groovy @@ -15,63 +15,69 @@ // specific language governing permissions and limitations // under the License. -suite("test_stream_load_audit_log", "p0") { +suite("test_stream_load_audit_log", "nonConcurrent") { def tableName = "test_stream_load_record" def label = UUID.randomUUID().toString().replaceAll("-", "") - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - `k1` bigint(20) NULL, - `k2` bigint(20) NULL, - `v1` tinyint(4) SUM NULL, - `v2` tinyint(4) REPLACE NULL, - `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL, - `v4` smallint(6) REPLACE_IF_NOT_NULL NULL, - `v5` int(11) REPLACE_IF_NOT_NULL NULL, - `v6` bigint(20) REPLACE_IF_NOT_NULL NULL, - `v7` largeint(40) REPLACE_IF_NOT_NULL NULL, - `v8` datetime REPLACE_IF_NOT_NULL NULL, - `v9` date REPLACE_IF_NOT_NULL NULL, - `v10` char(10) REPLACE_IF_NOT_NULL NULL, - `v11` varchar(6) REPLACE_IF_NOT_NULL NULL, - `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL - ) ENGINE=OLAP - AGGREGATE KEY(`k1`, `k2`) - COMMENT 'OLAP' - PARTITION BY RANGE(`k1`) - (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")), - PARTITION partition_b VALUES [("100000"), ("1000000000")), - PARTITION partition_c VALUES [("1000000000"), ("10000000000")), - PARTITION partition_d VALUES [("10000000000"), (MAXVALUE))) - DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 - PROPERTIES ("replication_allocation" = "tag.location.default: 1"); - """ + try { + set_be_param("enable_stream_load_record_to_audit_log_table", "true") - streamLoad { - table "${tableName}" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` bigint(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) SUM NULL, + `v2` tinyint(4) REPLACE NULL, + `v3` tinyint(4) REPLACE_IF_NOT_NULL NULL, + `v4` smallint(6) REPLACE_IF_NOT_NULL NULL, + `v5` int(11) REPLACE_IF_NOT_NULL NULL, + `v6` bigint(20) REPLACE_IF_NOT_NULL NULL, + `v7` largeint(40) REPLACE_IF_NOT_NULL NULL, + `v8` datetime REPLACE_IF_NOT_NULL NULL, + `v9` date REPLACE_IF_NOT_NULL NULL, + `v10` char(10) REPLACE_IF_NOT_NULL NULL, + `v11` varchar(6) REPLACE_IF_NOT_NULL NULL, + `v12` decimal(27, 9) REPLACE_IF_NOT_NULL NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION partition_a VALUES [("-9223372036854775808"), ("100000")), + PARTITION partition_b VALUES [("100000"), ("1000000000")), + PARTITION partition_c VALUES [("1000000000"), ("10000000000")), + PARTITION partition_d VALUES [("10000000000"), (MAXVALUE))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ - set 'column_separator', '\t' - set 'label', "${label}" - set 'columns', 'k1, k2, v2, v10, v11' - set 'partitions', 'partition_a, partition_b, partition_c, partition_d' - set 'strict_mode', 'true' + streamLoad { + table "${tableName}" - file 'test_strict_mode.csv' - time 10000 // limit inflight 10s - } + set 'column_separator', '\t' + set 'label', "${label}" + set 'columns', 'k1, k2, v2, v10, v11' + set 'partitions', 'partition_a, partition_b, partition_c, partition_d' + set 'strict_mode', 'true' - def count = 0 - while (true) { - sleep(1000) - def res = sql"select * from __internal_schema.audit_log where query_id = '${label}'" - log.info("Audit log result: ${res.toString()}") - if (res.size() > 0) { - break + file 'test_strict_mode.csv' + time 10000 // limit inflight 10s } - count++ - if (count > 150) { - throw new RuntimeException("Audit log not found after 150 seconds") + + def count = 0 + while (true) { + sleep(1000) + def res = sql"select * from __internal_schema.audit_log where query_id = '${label}'" + log.info("Audit log result: ${res.toString()}") + if (res.size() > 0) { + break + } + count++ + if (count > 150) { + throw new RuntimeException("Audit log not found after 150 seconds") + } } + } finally { + set_be_param("enable_stream_load_record_to_audit_log_table", "false") } } \ No newline at end of file