diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 8cd42d2f90b89f..c60a4664e5593c 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -57,7 +57,7 @@ bvar::Adder g_load_stream_cnt("load_stream_count"); bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms"); bvar::Adder g_load_stream_flush_running_threads("load_stream_flush_wait_threads"); -TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, +TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) : _id(id), _next_segid(0), @@ -332,7 +332,7 @@ Status TabletStream::close() { return _status.status(); } -IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, +IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, std::shared_ptr schema, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile) : _id(id), @@ -415,7 +415,8 @@ void IndexStream::close(const std::vector& tablets_to_commit, // TODO: Profile is temporary disabled, because: // 1. It's not being processed by the upstream for now // 2. There are some problems in _profile->to_thrift() -LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile) +LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, + bool enable_profile) : _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) { g_load_stream_cnt << 1; _profile = std::make_unique("LoadStream"); @@ -643,9 +644,10 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* // otherwise the message will be ignored and causing close wait timeout if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) { DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", { - PUniqueId& load_id = const_cast(hdr.load_id()); - load_id.set_hi(UNKNOWN_ID_FOR_TEST); - load_id.set_lo(UNKNOWN_ID_FOR_TEST); + PStreamHeader& t_hdr = const_cast(hdr); + PUniqueId* load_id = t_hdr.mutable_load_id(); + load_id->set_hi(UNKNOWN_ID_FOR_TEST); + load_id->set_lo(UNKNOWN_ID_FOR_TEST); }); DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", { PStreamHeader& t_hdr = const_cast(hdr); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index c3719111b89ef4..6e84a3f12f8b5c 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -44,8 +44,8 @@ using SegIdMapping = std::vector; using FailedTablets = std::vector>; class TabletStream { public: - TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr, - RuntimeProfile* profile); + TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, + LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile); Status init(std::shared_ptr schema, int64_t index_id, int64_t partition_id); @@ -85,7 +85,7 @@ using TabletStreamSharedPtr = std::shared_ptr; class IndexStream { public: - IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id, + IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id, std::shared_ptr schema, LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile); @@ -116,7 +116,7 @@ using IndexStreamSharedPtr = std::shared_ptr; using StreamId = brpc::StreamId; class LoadStream : public brpc::StreamInputHandler { public: - LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile); + LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile); ~LoadStream() override; Status init(const POpenLoadStreamRequest* request);