Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 8 additions & 25 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,13 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler
return ostr;
}

LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
: _use_cnt(num_use),
_load_id(load_id),
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map)
: _load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {};

LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
: _use_cnt(stub._use_cnt.load()),
_load_id(stub._load_id),
_src_id(stub._src_id),
_tablet_schema_for_index(stub._tablet_schema_for_index),
_enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
_tablet_schema_for_index(schema_map),
_enable_unique_mow_for_index(mow_map) {};

LoadStreamStub::~LoadStreamStub() {
if (_is_init.load() && !_is_closed.load()) {
Expand Down Expand Up @@ -241,23 +235,12 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64

// CLOSE_LOAD
Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) {
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());
}
if (--_use_cnt > 0) {
return Status::OK();
}
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
{
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
for (const auto& tablet : _tablets_to_commit) {
*header.add_tablets() = tablet;
}
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
return _encode_and_send(header);
}
Expand Down
14 changes: 7 additions & 7 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ class LoadStreamStub {

public:
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map);

// copy constructor, shared_ptr members are shared
LoadStreamStub(LoadStreamStub& stub);
LoadStreamStub(UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map)
: LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) {};

// for mock this class in UT
#ifdef BE_TEST
Expand Down Expand Up @@ -213,7 +217,6 @@ class LoadStreamStub {
std::atomic<bool> _is_closed;
std::atomic<bool> _is_cancelled;
std::atomic<bool> _is_eos;
std::atomic<int> _use_cnt;

PUniqueId _load_id;
brpc::StreamId _stream_id;
Expand All @@ -226,9 +229,6 @@ class LoadStreamStub {
bthread::Mutex _cancel_mutex;
bthread::ConditionVariable _close_cv;

std::mutex _tablets_to_commit_mutex;
std::vector<PTabletID> _tablets_to_commit;

std::mutex _buffer_mutex;
std::mutex _send_mutex;
butil::IOBuf _buffer;
Expand Down
110 changes: 82 additions & 28 deletions be/src/vec/sink/load_stream_stub_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,104 @@
namespace doris {
class TExpr;

LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool)
: _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamStubPool* pool)
: _load_id(load_id),
_src_id(src_id),
_num_streams(num_streams),
_use_cnt(num_use),
_pool(pool) {
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_use > 0) << "use num should be greater than 0";
}

std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
return streams;
}
streams = std::make_shared<Streams>();
auto schema_map = std::make_shared<IndexToTabletSchema>();
auto mow_map = std::make_shared<IndexToEnableMoW>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, schema_map, mow_map));
}
_streams_for_node[dst_id] = streams;
return streams;
}

std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
return _streams_for_node.at(dst_id);
}

bool LoadStreamMap::contains(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
return _streams_for_node.contains(dst_id);
}

void LoadStreams::release() {
void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
std::lock_guard<std::mutex> lock(_mutex);
for (auto& [dst_id, streams] : _streams_for_node) {
fn(dst_id, *streams);
}
}

Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const Streams&)> fn) {
std::lock_guard<std::mutex> lock(_mutex);
for (auto& [dst_id, streams] : _streams_for_node) {
RETURN_IF_ERROR(fn(dst_id, *streams));
}
return Status::OK();
}

void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
const std::vector<PTabletID>& tablets_to_commit) {
std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
auto& tablets = _tablets_to_commit[dst_id];
tablets.insert(tablets.end(), tablets_to_commit.begin(), tablets_to_commit.end());
}

bool LoadStreamMap::release() {
int num_use = --_use_cnt;
DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
if (num_use == 0) {
LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" << _dst_id;
_pool->erase(_load_id, _dst_id);
} else {
LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << _dst_id
<< ", use_cnt=" << num_use;
LOG(INFO) << "releasing streams, load_id=" << _load_id;
_pool->erase(_load_id);
return true;
}
LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" << num_use;
return false;
}

Status LoadStreamMap::close_load() {
return for_each_st([this](int64_t dst_id, const Streams& streams) -> Status {
const auto& tablets = _tablets_to_commit[dst_id];
for (auto& stream : streams) {
RETURN_IF_ERROR(stream->close_load(tablets));
}
return Status::OK();
});
}

LoadStreamStubPool::LoadStreamStubPool() = default;

LoadStreamStubPool::~LoadStreamStubPool() = default;

std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id, int num_streams,
int num_sink) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId load_id, int64_t src_id,
int num_streams, int num_use) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<LoadStreams> streams = _pool[key];
if (streams) {
std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
if (streams != nullptr) {
return streams;
}
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_sink > 0) << "sink num should be greater than 0";
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink});
streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
for (int32_t i = 0; i < num_streams; i++) {
// copy construct, internal tablet schema map will be shared among all stubs
streams->streams().emplace_back(new LoadStreamStub {*it->second});
}
_pool[key] = streams;
streams = std::make_shared<LoadStreamMap>(load_id, src_id, num_streams, num_use, this);
_pool[load_id] = streams;
return streams;
}

void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
void LoadStreamStubPool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(std::make_pair(load_id, dst_id));
_template_stubs.erase(load_id);
_pool.erase(load_id);
}

} // namespace doris
50 changes: 32 additions & 18 deletions be/src/vec/sink/load_stream_stub_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,41 @@ class LoadStreamStubPool;

using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;

class LoadStreams {
class LoadStreamMap {
public:
LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool);
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamStubPool* pool);

void release();
std::shared_ptr<Streams> get_or_create(int64_t dst_id);

Streams& streams() { return _streams; }
std::shared_ptr<Streams> at(int64_t dst_id);

bool contains(int64_t dst_id);

void for_each(std::function<void(int64_t, const Streams&)> fn);

Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);

void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& tablets_to_commit);

// Return true if the last instance is just released.
bool release();

// send CLOSE_LOAD to all streams, return ERROR if any.
// only call this method after release() returns true.
Status close_load();

private:
Streams _streams;
UniqueId _load_id;
int64_t _dst_id;
const UniqueId _load_id;
const int64_t _src_id;
const int _num_streams;
std::atomic<int> _use_cnt;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
LoadStreamStubPool* _pool = nullptr;

std::mutex _tablets_to_commit_mutex;
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
};

class LoadStreamStubPool {
Expand All @@ -94,26 +115,19 @@ class LoadStreamStubPool {

~LoadStreamStubPool();

std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id,
int num_streams, int num_sink);
std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t src_id, int num_streams,
int num_use);

void erase(UniqueId load_id, int64_t dst_id);
void erase(UniqueId load_id);

size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
return _pool.size();
}

// for UT only
size_t templates_size() {
std::lock_guard<std::mutex> lock(_mutex);
return _template_stubs.size();
}

private:
std::mutex _mutex;
std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> _template_stubs;
std::unordered_map<std::pair<UniqueId, int64_t>, std::shared_ptr<LoadStreams>> _pool;
std::unordered_map<UniqueId, std::shared_ptr<LoadStreamMap>> _pool;
};

} // namespace doris
Loading