From 229945264ab118aedcf5581aca229e9429b3efc7 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 26 Jun 2025 11:55:23 +0800 Subject: [PATCH 1/3] [fix](load) fix invalid load id due to injection --- be/src/runtime/load_stream.cpp | 13 +++++++------ be/src/runtime/load_stream.h | 6 +++--- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 8cd42d2f90b89f..9a04de656d7512 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -57,7 +57,7 @@ bvar::Adder g_load_stream_cnt("load_stream_count"); bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); bvar::Adder g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); -TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, +TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) : _id(id), _next_segid(0), @@ -332,7 +332,7 @@ Status TabletStream::close() { return _status.status(); } -IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, +IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, std::shared_ptr schema, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) : _id(id), @@ -415,7 +415,7 @@ void IndexStream::close(const std::vector& tablets_to_commit, // TODO: Profile is temporary disabled, because: // 1. It's not being processed by the upstream for now // 2. There are some problems in _profile->to_thrift() -LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile) +LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile) : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { g_load_stream_cnt << 1; _profile = std::make_unique("LoadStream"); @@ -643,9 +643,10 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* // otherwise the message will be ignored and causing close wait timeout if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) { DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", { - PUniqueId& load_id = const_cast(hdr.load_id()); - load_id.set_hi(UNKNOWN_ID_FOR_TEST); - load_id.set_lo(UNKNOWN_ID_FOR_TEST); + PStreamHeader& t_hdr = const_cast(hdr); + PUniqueId* load_id = t_hdr.mutable_load_id(); + load_id->set_hi(UNKNOWN_ID_FOR_TEST); + load_id->set_lo(UNKNOWN_ID_FOR_TEST); }); DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", { PStreamHeader& t_hdr = const_cast(hdr); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index c3719111b89ef4..8c0561ea876728 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -44,7 +44,7 @@ using SegIdMapping = std::vector; using FailedTablets = std::vector>; class TabletStream { public: - TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, + TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile); Status init(std::shared_ptr schema, int64_t index_id, @@ -85,7 +85,7 @@ using TabletStreamSharedPtr = std::shared_ptr; class IndexStream { public: - IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, + IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, std::shared_ptr schema, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile); @@ -116,7 +116,7 @@ using IndexStreamSharedPtr = std::shared_ptr; using StreamId = brpc::StreamId; class LoadStream : public brpc::StreamInputHandler { public: - LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile); + LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile); ~LoadStream() override; Status init(const POpenLoadStreamRequest* request); From cc91de7d0d549387d0cc55dea5e5307ef64bc503 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 27 Jun 2025 14:22:10 +0800 Subject: [PATCH 2/3] fix style --- be/src/runtime/load_stream.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 9a04de656d7512..c60a4664e5593c 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -415,7 +415,8 @@ void IndexStream::close(const std::vector& tablets_to_commit, // TODO: Profile is temporary disabled, because: // 1. It's not being processed by the upstream for now // 2. There are some problems in _profile->to_thrift() -LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile) +LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, + bool enable_profile) : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { g_load_stream_cnt << 1; _profile = std::make_unique("LoadStream"); From 6190239864162b0dc1b73e1af5c5c24361ca8d0c Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 27 Jun 2025 14:24:12 +0800 Subject: [PATCH 3/3] fix --- be/src/runtime/load_stream.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index 8c0561ea876728..6e84a3f12f8b5c 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -44,8 +44,8 @@ using SegIdMapping = std::vector; using FailedTablets = std::vector>; class TabletStream { public: - TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, - RuntimeProfile* profile); + TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, + LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile); Status init(std::shared_ptr schema, int64_t index_id, int64_t partition_id);