Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ bvar::Adder<int64_t> g_load_stream_cnt("load_stream_count");
bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads");

TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
TabletStream::TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
: _id(id),
_next_segid(0),
Expand Down Expand Up @@ -332,7 +332,7 @@ Status TabletStream::close() {
return _status.status();
}

IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
IndexStream::IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
: _id(id),
Expand Down Expand Up @@ -415,7 +415,8 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
// TODO: Profile is temporary disabled, because:
// 1. It's not being processed by the upstream for now
// 2. There are some problems in _profile->to_thrift()
LoadStream::LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile)
LoadStream::LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr,
bool enable_profile)
: _load_id(load_id), _enable_profile(false), _load_stream_mgr(load_stream_mgr) {
g_load_stream_cnt << 1;
_profile = std::make_unique<RuntimeProfile>("LoadStream");
Expand Down Expand Up @@ -643,9 +644,10 @@ void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf*
// otherwise the message will be ignored and causing close wait timeout
if (hdr.opcode() != PStreamHeader::CLOSE_LOAD) {
DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_loadid", {
PUniqueId& load_id = const_cast<PUniqueId&>(hdr.load_id());
load_id.set_hi(UNKNOWN_ID_FOR_TEST);
load_id.set_lo(UNKNOWN_ID_FOR_TEST);
PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
PUniqueId* load_id = t_hdr.mutable_load_id();
load_id->set_hi(UNKNOWN_ID_FOR_TEST);
load_id->set_lo(UNKNOWN_ID_FOR_TEST);
});
DBUG_EXECUTE_IF("LoadStream._dispatch.unknown_srcid", {
PStreamHeader& t_hdr = const_cast<PStreamHeader&>(hdr);
Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ using SegIdMapping = std::vector<uint32_t>;
using FailedTablets = std::vector<std::pair<int64_t, Status>>;
class TabletStream {
public:
TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id, LoadStreamMgr* load_stream_mgr,
RuntimeProfile* profile);
TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);

Status init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t index_id,
int64_t partition_id);
Expand Down Expand Up @@ -85,7 +85,7 @@ using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;

class IndexStream {
public:
IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* load_stream_mgr,
RuntimeProfile* profile);

Expand Down Expand Up @@ -116,7 +116,7 @@ using IndexStreamSharedPtr = std::shared_ptr<IndexStream>;
using StreamId = brpc::StreamId;
class LoadStream : public brpc::StreamInputHandler {
public:
LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile);
LoadStream(const PUniqueId& load_id, LoadStreamMgr* load_stream_mgr, bool enable_profile);
~LoadStream() override;

Status init(const POpenLoadStreamRequest* request);
Expand Down
Loading