Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 16 additions & 34 deletions be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,20 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams,
DCHECK(num_use > 0) << "use num should be greater than 0";
}

std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) {
std::shared_ptr<LoadStreamStubs> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
std::shared_ptr<LoadStreamStubs> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
return streams;
}
streams = std::make_shared<Streams>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index,
_enable_unique_mow_for_index, incremental));
}
streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, _src_id,
_tablet_schema_for_index,
_enable_unique_mow_for_index, incremental);
_streams_for_node[dst_id] = streams;
return streams;
}

std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
std::shared_ptr<LoadStreamStubs> LoadStreamMap::at(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
return _streams_for_node.at(dst_id);
}
Expand All @@ -60,7 +58,7 @@ bool LoadStreamMap::contains(int64_t dst_id) {
return _streams_for_node.contains(dst_id);
}

void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
void LoadStreamMap::for_each(std::function<void(int64_t, LoadStreamStubs&)> fn) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
Expand All @@ -71,7 +69,7 @@ void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
}
}

Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const Streams&)> fn) {
Status LoadStreamMap::for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn) {
decltype(_streams_for_node) snapshot;
{
std::lock_guard<std::mutex> lock(_mutex);
Expand Down Expand Up @@ -108,38 +106,22 @@ bool LoadStreamMap::release() {
}

void LoadStreamMap::close_load(bool incremental) {
auto st = for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status {
for (auto& [dst_id, streams] : _streams_for_node) {
if (streams->is_incremental()) {
continue;
}
std::vector<PTabletID> tablets_to_commit;
const auto& tablets = _tablets_to_commit[dst_id];
tablets_to_commit.reserve(tablets.size());
for (const auto& [tablet_id, tablet] : tablets) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
}
Status status = Status::OK();
bool first = true;
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
if (first) {
auto st = stream->close_load(tablets_to_commit);
if (!st.ok() && status.ok()) {
status = st;
}
first = false;
} else {
auto st = stream->close_load({});
if (!st.ok() && status.ok()) {
status = st;
}
}
auto st = streams->close_load(tablets_to_commit);
if (!st.ok()) {
LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental")
<< " streams failed: " << st << ", load_id=" << _load_id;
}
return status;
});
if (!st.ok()) {
LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental")
<< " streams failed: " << st << ", load_id=" << _load_id;
}
}

Expand Down
12 changes: 5 additions & 7 deletions be/src/vec/sink/load_stream_map_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,20 @@ class LoadStreamStub;

class LoadStreamMapPool;

using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;

class LoadStreamMap {
public:
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamMapPool* pool);

std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = false);
std::shared_ptr<LoadStreamStubs> get_or_create(int64_t dst_id, bool incremental = false);

std::shared_ptr<Streams> at(int64_t dst_id);
std::shared_ptr<LoadStreamStubs> at(int64_t dst_id);

bool contains(int64_t dst_id);

void for_each(std::function<void(int64_t, const Streams&)> fn);
void for_each(std::function<void(int64_t, LoadStreamStubs&)> fn);

Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);
Status for_each_st(std::function<Status(int64_t, LoadStreamStubs&)> fn);

void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& tablets_to_commit);

Expand All @@ -107,7 +105,7 @@ class LoadStreamMap {
const int _num_streams;
std::atomic<int> _use_cnt;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> _streams_for_node;
LoadStreamMapPool* _pool = nullptr;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
Expand Down
66 changes: 65 additions & 1 deletion be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port
<< ", " << *this;
_is_open.store(true);
return Status::OK();
_status = Status::OK();
return _status;
}

// APPEND_DATA
Expand Down Expand Up @@ -504,4 +505,67 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub)
return ostr;
}

Status LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
int64_t idle_timeout_ms, bool enable_profile) {
bool get_schema = true;
auto status = Status::OK();
for (auto& stream : _streams) {
Status st;
if (get_schema) {
st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema,
total_streams, idle_timeout_ms, enable_profile);
} else {
st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams,
idle_timeout_ms, enable_profile);
}
if (st.ok()) {
get_schema = false;
} else {
LOG(WARNING) << "open stream failed: " << st << "; stream: " << *stream;
status = st;
// no break here to try get schema from the rest streams
}
}
// only mark open when all streams open success
_open_success.store(status.ok());
// cancel all streams if open failed
if (!status.ok()) {
cancel(status);
}
return status;
}

Status LoadStreamStubs::close_load(const std::vector<PTabletID>& tablets_to_commit) {
if (!_open_success.load()) {
return Status::InternalError("streams not open");
}
bool first = true;
auto status = Status::OK();
for (auto& stream : _streams) {
Status st;
if (first) {
st = stream->close_load(tablets_to_commit);
first = false;
} else {
st = stream->close_load({});
}
if (!st.ok()) {
LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream;
}
}
return status;
}

Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) {
MonotonicStopWatch watch;
watch.start();
for (auto& stream : _streams) {
RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - watch.elapsed_time() / 1000 / 1000));
}
return Status::OK();
}

} // namespace doris
69 changes: 69 additions & 0 deletions be/src/vec/sink/load_stream_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,73 @@ class LoadStreamStub : public std::enable_shared_from_this<LoadStreamStub> {
bool _is_incremental = false;
};

// a collection of LoadStreams connect to the same node
class LoadStreamStubs {
public:
LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false)
: _is_incremental(incremental) {
_streams.reserve(num_streams);
for (size_t i = 0; i < num_streams; i++) {
_streams.emplace_back(
new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental));
}
}

Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
const std::vector<PTabletID>& tablets_for_schema, int total_streams,
int64_t idle_timeout_ms, bool enable_profile);

bool is_incremental() const { return _is_incremental; }

size_t size() const { return _streams.size(); }

// for UT only
void mark_open() { _open_success.store(true); }

std::shared_ptr<LoadStreamStub> select_one_stream() {
if (!_open_success.load()) {
return nullptr;
}
size_t i = _select_index.fetch_add(1);
return _streams[i % _streams.size()];
}

void cancel(Status reason) {
for (auto& stream : _streams) {
stream->cancel(reason);
}
}

Status close_load(const std::vector<PTabletID>& tablets_to_commit);

Status close_wait(RuntimeState* state, int64_t timeout_ms = 0);

std::unordered_set<int64_t> success_tablets() {
std::unordered_set<int64_t> s;
for (auto& stream : _streams) {
auto v = stream->success_tablets();
std::copy(v.begin(), v.end(), std::inserter(s, s.end()));
}
return s;
}

std::unordered_map<int64_t, Status> failed_tablets() {
std::unordered_map<int64_t, Status> m;
for (auto& stream : _streams) {
auto v = stream->failed_tablets();
m.insert(v.begin(), v.end());
}
return m;
}

private:
std::vector<std::shared_ptr<LoadStreamStub>> _streams;
std::atomic<bool> _open_success = false;
std::atomic<size_t> _select_index = 0;
const bool _is_incremental;
};

} // namespace doris
Loading