Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "common/status.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/thrift_server.h"

Expand Down Expand Up @@ -186,4 +188,10 @@ void CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
response.status = t_status;
}

void CloudBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) {
BaseBackendService::get_stream_load_record(result, last_stream_record_time,
_engine.get_stream_load_recorder());
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class CloudBackendService final : public BaseBackendService {
void check_warm_up_cache_async(TCheckWarmUpCacheAsyncResponse& response,
const TCheckWarmUpCacheAsyncRequest& request) override;

void get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) override;

private:
CloudStorageEngine& _engine;
};
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ Status CloudStorageEngine::open() {

_tablet_hotspot = std::make_unique<TabletHotspot>();

RETURN_NOT_OK_STATUS_WITH_WARN(
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
"init StreamLoadRecorder failed");

return ThreadPoolBuilder("SyncLoadForTabletsThreadPool")
.set_max_threads(config::sync_load_for_tablets_thread)
.set_min_threads(config::sync_load_for_tablets_thread)
Expand Down
8 changes: 5 additions & 3 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <rapidjson/prettywriter.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
Expand Down Expand Up @@ -119,7 +120,7 @@ void HttpStreamAction::handle(HttpRequest* req) {
// add new line at end
str = str + '\n';
HttpChannel::send_reply(req, str);
if (config::enable_stream_load_record && !config::is_cloud_mode()) {
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
Expand Down Expand Up @@ -364,8 +365,9 @@ Status HttpStreamAction::process_put(HttpRequest* http_req,

void HttpStreamAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str) {
auto stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder();
std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();

if (stream_load_recorder != nullptr) {
std::string key =
std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
Expand Down
8 changes: 5 additions & 3 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <stdexcept>
#include <utility>

#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/config.h"
#include "common/consts.h"
Expand Down Expand Up @@ -217,7 +218,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
str = str + '\n';
HttpChannel::send_reply(req, str);
#ifndef BE_TEST
if (config::enable_stream_load_record && !config::is_cloud_mode()) {
if (config::enable_stream_load_record) {
str = ctx->prepare_stream_load_record(str);
_save_stream_load_record(ctx, str);
}
Expand Down Expand Up @@ -705,8 +706,9 @@ Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_pa

void StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str) {
auto stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().to_local().get_stream_load_recorder();
std::shared_ptr<StreamLoadRecorder> stream_load_recorder =
ExecEnv::GetInstance()->storage_engine().get_stream_load_recorder();

if (stream_load_recorder != nullptr) {
std::string key =
std::to_string(ctx->start_millis + ctx->load_cost_millis) + "_" + ctx->label;
Expand Down
41 changes: 20 additions & 21 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ int64_t BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change(
config::memory_limitation_per_thread_for_schema_change_bytes);
}

Status BaseStorageEngine::init_stream_load_recorder(const std::string& stream_load_record_path) {
LOG(INFO) << "stream load record path: " << stream_load_record_path;
// init stream load record rocksdb
_stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path);
if (_stream_load_recorder == nullptr) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"),
"new StreamLoadRecorder failed");
}
auto st = _stream_load_recorder->init();
if (!st.ok()) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::IOError("open StreamLoadRecorder rocksdb failed, path={}",
stream_load_record_path),
"init StreamLoadRecorder failed");
}
return Status::OK();
}

static Status _validate_options(const EngineOptions& options) {
if (options.store_paths.empty()) {
return Status::InternalError("store paths is empty");
Expand All @@ -158,7 +177,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_tablet_manager(new TabletManager(*this, config::tablet_map_shard_size)),
_txn_manager(new TxnManager(*this, config::txn_map_shard_size, config::txn_shard_size)),
_default_rowset_type(BETA_ROWSET),
_stream_load_recorder(nullptr),
_create_tablet_idx_lru_cache(
new CreateTabletIdxCache(config::partition_disk_index_lru_size)),
_snapshot_mgr(std::make_unique<SnapshotManager>(*this)) {
Expand Down Expand Up @@ -274,31 +292,12 @@ Status StorageEngine::_init_store_map() {
return Status::InternalError("init path failed, error={}", error_msg);
}

RETURN_NOT_OK_STATUS_WITH_WARN(_init_stream_load_recorder(_options.store_paths[0].path),
RETURN_NOT_OK_STATUS_WITH_WARN(init_stream_load_recorder(_options.store_paths[0].path),
"init StreamLoadRecorder failed");

return Status::OK();
}

Status StorageEngine::_init_stream_load_recorder(const std::string& stream_load_record_path) {
LOG(INFO) << "stream load record path: " << stream_load_record_path;
// init stream load record rocksdb
_stream_load_recorder = StreamLoadRecorder::create_shared(stream_load_record_path);
if (_stream_load_recorder == nullptr) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::MemoryAllocFailed("allocate memory for StreamLoadRecorder failed"),
"new StreamLoadRecorder failed");
}
auto st = _stream_load_recorder->init();
if (!st.ok()) {
RETURN_NOT_OK_STATUS_WITH_WARN(
Status::IOError("open StreamLoadRecorder rocksdb failed, path={}",
stream_load_record_path),
"init StreamLoadRecorder failed");
}
return Status::OK();
}

void StorageEngine::_update_storage_medium_type_count() {
set<TStorageMedium::type> available_storage_medium_types;

Expand Down
16 changes: 8 additions & 8 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class BaseStorageEngine {

int get_disk_num() { return _disk_num; }

Status init_stream_load_recorder(const std::string& stream_load_record_path);

const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
return _stream_load_recorder;
}

protected:
void _evict_querying_rowset();
void _evict_quring_rowset_thread_callback();
Expand All @@ -157,6 +163,8 @@ class BaseStorageEngine {
int64_t _memory_limitation_bytes_for_schema_change;

int _disk_num {-1};

std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
};

class StorageEngine final : public BaseStorageEngine {
Expand Down Expand Up @@ -246,10 +254,6 @@ class StorageEngine final : public BaseStorageEngine {

bool should_fetch_from_peer(int64_t tablet_id);

const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
return _stream_load_recorder;
}

Status get_compaction_status_json(std::string* result);

// check cumulative compaction config
Expand Down Expand Up @@ -349,8 +353,6 @@ class StorageEngine final : public BaseStorageEngine {
void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
CompactionType compaction_type);

Status _init_stream_load_recorder(const std::string& stream_load_record_path);

Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
bool force);

Expand Down Expand Up @@ -470,8 +472,6 @@ class StorageEngine final : public BaseStorageEngine {
std::mutex _compaction_producer_sleep_mutex;
std::condition_variable _compaction_producer_sleep_cv;

std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;

// we use unordered_map to store all cumulative compaction policy sharded ptr
std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>
_cumulative_compaction_policies;
Expand Down
48 changes: 27 additions & 21 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,27 +923,8 @@ void BaseBackendService::close_scanner(TScanCloseResult& result_, const TScanClo

void BackendService::get_stream_load_record(TStreamLoadRecordResult& result,
int64_t last_stream_record_time) {
auto stream_load_recorder = _engine.get_stream_load_recorder();
if (stream_load_recorder != nullptr) {
std::map<std::string, std::string> records;
auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
config::stream_load_record_batch_size, &records);
if (st.ok()) {
LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: "
<< records.size()
<< ", last_stream_load_timestamp: " << last_stream_record_time;
std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
auto it = records.begin();
for (; it != records.end(); ++it) {
TStreamLoadRecord stream_load_item;
StreamLoadContext::parse_stream_load_record(it->second, stream_load_item);
stream_load_record_batch.emplace(it->first.c_str(), stream_load_item);
}
result.__set_stream_load_record(stream_load_record_batch);
}
} else {
LOG(WARNING) << "stream_load_recorder is null.";
}
BaseBackendService::get_stream_load_record(result, last_stream_record_time,
_engine.get_stream_load_recorder());
}

void BackendService::check_storage_format(TCheckStorageFormatResult& result) {
Expand Down Expand Up @@ -1199,6 +1180,31 @@ void BaseBackendService::get_stream_load_record(TStreamLoadRecordResult& result,
LOG(ERROR) << "get_stream_load_record is not implemented";
}

void BaseBackendService::get_stream_load_record(
TStreamLoadRecordResult& result, int64_t last_stream_record_time,
std::shared_ptr<StreamLoadRecorder> stream_load_recorder) {
if (stream_load_recorder != nullptr) {
std::map<std::string, std::string> records;
auto st = stream_load_recorder->get_batch(std::to_string(last_stream_record_time),
config::stream_load_record_batch_size, &records);
if (st.ok()) {
LOG(INFO) << "get_batch stream_load_record rocksdb successfully. records size: "
<< records.size()
<< ", last_stream_load_timestamp: " << last_stream_record_time;
std::map<std::string, TStreamLoadRecord> stream_load_record_batch;
auto it = records.begin();
for (; it != records.end(); ++it) {
TStreamLoadRecord stream_load_item;
StreamLoadContext::parse_stream_load_record(it->second, stream_load_item);
stream_load_record_batch.emplace(it->first.c_str(), stream_load_item);
}
result.__set_stream_load_record(stream_load_record_batch);
}
} else {
LOG(WARNING) << "stream_load_recorder is null.";
}
}

void BaseBackendService::get_disk_trash_used_capacity(std::vector<TDiskTrashInfo>& diskTrashInfos) {
LOG(ERROR) << "get_disk_trash_used_capacity is not implemented";
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "agent/agent_server.h"
#include "agent/topic_subscriber.h"
#include "common/status.h"
#include "runtime/stream_load/stream_load_recorder.h"

namespace doris {

Expand Down Expand Up @@ -162,6 +163,9 @@ class BaseBackendService : public BackendServiceIf {
protected:
Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);

void get_stream_load_record(TStreamLoadRecordResult& result, int64_t last_stream_record_time,
std::shared_ptr<StreamLoadRecorder> stream_load_recorder);

ExecEnv* _exec_env = nullptr;
std::unique_ptr<AgentServer> _agent_server;
std::unique_ptr<ThreadPool> _ingest_binlog_workers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,19 @@ public void addStreamLoadRecord(long dbId, String label, StreamLoadRecord stream
}

public List<StreamLoadItem> getStreamLoadRecords() {
LOG.info("test log: {}", streamLoadRecordHeap);
return new ArrayList<>(streamLoadRecordHeap);
}

public List<List<Comparable>> getStreamLoadRecordByDb(
long dbId, String label, boolean accurateMatch, StreamLoadState state) {
LinkedList<List<Comparable>> streamLoadRecords = new LinkedList<List<Comparable>>();
LOG.info("test log: {}", dbId);

readLock();
try {
if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) {
LOG.info("test log: {}", dbId);
return streamLoadRecords;
}

Expand Down Expand Up @@ -202,6 +205,7 @@ public List<List<Comparable>> getStreamLoadRecordByDb(
}

}
LOG.info("test log: {}", streamLoadRecords);
return streamLoadRecords;
} finally {
readUnlock();
Expand Down Expand Up @@ -263,19 +267,17 @@ protected void runAfterCatalogReady() {
TimeUtils.getDatetimeMsFormatWithTimeZone());
String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(),
TimeUtils.getDatetimeMsFormatWithTimeZone());
if (LOG.isDebugEnabled()) {
LOG.debug("receive stream load record info from backend: {}."
+ " label: {}, db: {}, tbl: {}, user: {}, user_ip: {},"
+ " status: {}, message: {}, error_url: {},"
+ " total_rows: {}, loaded_rows: {}, filtered_rows: {}, unselected_rows: {},"
+ " load_bytes: {}, start_time: {}, finish_time: {}.",
backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(),
streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(),
streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(),
streamLoadItem.getLoadBytes(), startTime, finishTime);
}
LOG.info("receive stream load record info from backend: {}."
+ " label: {}, db: {}, tbl: {}, user: {}, user_ip: {},"
+ " status: {}, message: {}, error_url: {},"
+ " total_rows: {}, loaded_rows: {}, filtered_rows: {}, unselected_rows: {},"
+ " load_bytes: {}, start_time: {}, finish_time: {}.",
backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(),
streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(),
streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(),
streamLoadItem.getLoadBytes(), startTime, finishTime);

AuditEvent auditEvent =
new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
Expand Down
2 changes: 2 additions & 0 deletions regression-test/pipeline/cloud_p0/conf/be_custom.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ file_cache_path = [{"path":"/data/doris_cloud/file_cache","total_size":104857600
tmp_file_dirs = [{"path":"/data/doris_cloud/tmp","max_cache_bytes":104857600,"max_upload_bytes":104857600}]
thrift_rpc_timeout_ms = 360000
save_load_error_log_to_s3 = true
enable_stream_load_record = true
stream_load_record_batch_size = 500
webserver_num_workers = 128
1 change: 1 addition & 0 deletions regression-test/pipeline/p0/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ max_garbage_sweep_interval=180

log_buffer_level = -1
enable_stream_load_record = true
stream_load_record_batch_size = 500
storage_root_path=/mnt/ssd01/cluster_storage/doris.SSD/P0/cluster1;/mnt/ssd01/cluster_storage/doris.SSD
disable_auto_compaction=true
priority_networks=172.19.0.0/24
Expand Down
Loading