From ee63f4cd6f012b1ed569f25e6b4d8f41b62d0a09 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Wed, 3 Apr 2024 19:44:52 +0800 Subject: [PATCH] [fix](move-memtable) fix initial use count of streams for auto partition (#33165) --- be/src/vec/sink/load_stream_stub.cpp | 33 +-- be/src/vec/sink/load_stream_stub.h | 14 +- be/src/vec/sink/load_stream_stub_pool.cpp | 110 +++++++--- be/src/vec/sink/load_stream_stub_pool.h | 50 +++-- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 192 +++++++++--------- be/src/vec/sink/writer/vtablet_writer_v2.h | 10 +- .../io/fs/stream_sink_file_writer_test.cpp | 4 +- .../vec/exec/load_stream_stub_pool_test.cpp | 31 ++- 8 files changed, 256 insertions(+), 188 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 6eb91e46853d3e..7708e3b255fe61 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -125,19 +125,13 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler return ostr; } -LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use) - : _use_cnt(num_use), - _load_id(load_id), +LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, + std::shared_ptr schema_map, + std::shared_ptr mow_map) + : _load_id(load_id), _src_id(src_id), - _tablet_schema_for_index(std::make_shared()), - _enable_unique_mow_for_index(std::make_shared()) {}; - -LoadStreamStub::LoadStreamStub(LoadStreamStub& stub) - : _use_cnt(stub._use_cnt.load()), - _load_id(stub._load_id), - _src_id(stub._src_id), - _tablet_schema_for_index(stub._tablet_schema_for_index), - _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {}; + _tablet_schema_for_index(schema_map), + _enable_unique_mow_for_index(mow_map) {}; LoadStreamStub::~LoadStreamStub() { if (_is_init.load() && !_is_closed.load()) { @@ -241,23 +235,12 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector& tablets_to_commit) { - { - std::lock_guard lock(_tablets_to_commit_mutex); - _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), - tablets_to_commit.end()); - } - if (--_use_cnt > 0) { - return Status::OK(); - } PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); - { - std::lock_guard lock(_tablets_to_commit_mutex); - for (const auto& tablet : _tablets_to_commit) { - *header.add_tablets() = tablet; - } + for (const auto& tablet : tablets_to_commit) { + *header.add_tablets() = tablet; } return _encode_and_send(header); } diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index f20b0e6ea3d7c5..aa8b850760e793 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -109,10 +109,14 @@ class LoadStreamStub { public: // construct new stub - LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use); + LoadStreamStub(PUniqueId load_id, int64_t src_id, + std::shared_ptr schema_map, + std::shared_ptr mow_map); - // copy constructor, shared_ptr members are shared - LoadStreamStub(LoadStreamStub& stub); + LoadStreamStub(UniqueId load_id, int64_t src_id, + std::shared_ptr schema_map, + std::shared_ptr mow_map) + : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) {}; // for mock this class in UT #ifdef BE_TEST @@ -213,7 +217,6 @@ class LoadStreamStub { std::atomic _is_closed; std::atomic _is_cancelled; std::atomic _is_eos; - std::atomic _use_cnt; PUniqueId _load_id; brpc::StreamId _stream_id; @@ -226,9 +229,6 @@ class LoadStreamStub { bthread::Mutex _cancel_mutex; bthread::ConditionVariable _close_cv; - std::mutex _tablets_to_commit_mutex; - std::vector _tablets_to_commit; - std::mutex _buffer_mutex; std::mutex _send_mutex; butil::IOBuf _buffer; diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index d76402b57d5020..3eae49aff7717a 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -23,50 +23,104 @@ namespace doris { class TExpr; -LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool) - : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {} +LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, + LoadStreamStubPool* pool) + : _load_id(load_id), + _src_id(src_id), + _num_streams(num_streams), + _use_cnt(num_use), + _pool(pool) { + DCHECK(num_streams > 0) << "stream num should be greater than 0"; + DCHECK(num_use > 0) << "use num should be greater than 0"; +} + +std::shared_ptr LoadStreamMap::get_or_create(int64_t dst_id) { + std::lock_guard lock(_mutex); + std::shared_ptr streams = _streams_for_node[dst_id]; + if (streams != nullptr) { + return streams; + } + streams = std::make_shared(); + auto schema_map = std::make_shared(); + auto mow_map = std::make_shared(); + for (int i = 0; i < _num_streams; i++) { + streams->emplace_back(new LoadStreamStub(_load_id, _src_id, schema_map, mow_map)); + } + _streams_for_node[dst_id] = streams; + return streams; +} + +std::shared_ptr LoadStreamMap::at(int64_t dst_id) { + std::lock_guard lock(_mutex); + return _streams_for_node.at(dst_id); +} + +bool LoadStreamMap::contains(int64_t dst_id) { + std::lock_guard lock(_mutex); + return _streams_for_node.contains(dst_id); +} -void LoadStreams::release() { +void LoadStreamMap::for_each(std::function fn) { + std::lock_guard lock(_mutex); + for (auto& [dst_id, streams] : _streams_for_node) { + fn(dst_id, *streams); + } +} + +Status LoadStreamMap::for_each_st(std::function fn) { + std::lock_guard lock(_mutex); + for (auto& [dst_id, streams] : _streams_for_node) { + RETURN_IF_ERROR(fn(dst_id, *streams)); + } + return Status::OK(); +} + +void LoadStreamMap::save_tablets_to_commit(int64_t dst_id, + const std::vector& tablets_to_commit) { + std::lock_guard lock(_tablets_to_commit_mutex); + auto& tablets = _tablets_to_commit[dst_id]; + tablets.insert(tablets.end(), tablets_to_commit.begin(), tablets_to_commit.end()); +} + +bool LoadStreamMap::release() { int num_use = --_use_cnt; - DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; }); if (num_use == 0) { - LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" << _dst_id; - _pool->erase(_load_id, _dst_id); - } else { - LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << _dst_id - << ", use_cnt=" << num_use; + LOG(INFO) << "releasing streams, load_id=" << _load_id; + _pool->erase(_load_id); + return true; } + LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" << num_use; + return false; +} + +Status LoadStreamMap::close_load() { + return for_each_st([this](int64_t dst_id, const Streams& streams) -> Status { + const auto& tablets = _tablets_to_commit[dst_id]; + for (auto& stream : streams) { + RETURN_IF_ERROR(stream->close_load(tablets)); + } + return Status::OK(); + }); } LoadStreamStubPool::LoadStreamStubPool() = default; LoadStreamStubPool::~LoadStreamStubPool() = default; - -std::shared_ptr LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id, - int64_t dst_id, int num_streams, - int num_sink) { - auto key = std::make_pair(UniqueId(load_id), dst_id); +std::shared_ptr LoadStreamStubPool::get_or_create(UniqueId load_id, int64_t src_id, + int num_streams, int num_use) { std::lock_guard lock(_mutex); - std::shared_ptr streams = _pool[key]; - if (streams) { + std::shared_ptr streams = _pool[load_id]; + if (streams != nullptr) { return streams; } - DCHECK(num_streams > 0) << "stream num should be greater than 0"; - DCHECK(num_sink > 0) << "sink num should be greater than 0"; - auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink}); - streams = std::make_shared(load_id, dst_id, num_sink, this); - for (int32_t i = 0; i < num_streams; i++) { - // copy construct, internal tablet schema map will be shared among all stubs - streams->streams().emplace_back(new LoadStreamStub {*it->second}); - } - _pool[key] = streams; + streams = std::make_shared(load_id, src_id, num_streams, num_use, this); + _pool[load_id] = streams; return streams; } -void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) { +void LoadStreamStubPool::erase(UniqueId load_id) { std::lock_guard lock(_mutex); - _pool.erase(std::make_pair(load_id, dst_id)); - _template_stubs.erase(load_id); + _pool.erase(load_id); } } // namespace doris diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index 662fc5bc1a143d..65f3bb66cd2236 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -72,20 +72,41 @@ class LoadStreamStubPool; using Streams = std::vector>; -class LoadStreams { +class LoadStreamMap { public: - LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool); + LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, + LoadStreamStubPool* pool); - void release(); + std::shared_ptr get_or_create(int64_t dst_id); - Streams& streams() { return _streams; } + std::shared_ptr at(int64_t dst_id); + + bool contains(int64_t dst_id); + + void for_each(std::function fn); + + Status for_each_st(std::function fn); + + void save_tablets_to_commit(int64_t dst_id, const std::vector& tablets_to_commit); + + // Return true if the last instance is just released. + bool release(); + + // send CLOSE_LOAD to all streams, return ERROR if any. + // only call this method after release() returns true. + Status close_load(); private: - Streams _streams; - UniqueId _load_id; - int64_t _dst_id; + const UniqueId _load_id; + const int64_t _src_id; + const int _num_streams; std::atomic _use_cnt; + std::mutex _mutex; + std::unordered_map> _streams_for_node; LoadStreamStubPool* _pool = nullptr; + + std::mutex _tablets_to_commit_mutex; + std::unordered_map> _tablets_to_commit; }; class LoadStreamStubPool { @@ -94,26 +115,19 @@ class LoadStreamStubPool { ~LoadStreamStubPool(); - std::shared_ptr get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id, - int num_streams, int num_sink); + std::shared_ptr get_or_create(UniqueId load_id, int64_t src_id, int num_streams, + int num_use); - void erase(UniqueId load_id, int64_t dst_id); + void erase(UniqueId load_id); size_t size() { std::lock_guard lock(_mutex); return _pool.size(); } - // for UT only - size_t templates_size() { - std::lock_guard lock(_mutex); - return _template_stubs.size(); - } - private: std::mutex _mutex; - std::unordered_map> _template_stubs; - std::unordered_map, std::shared_ptr> _pool; + std::unordered_map> _pool; }; } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 68506ca161e015..315df063ea03a6 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -98,7 +98,7 @@ Status VTabletWriterV2::_incremental_open_streams( tablet.set_partition_id(partition->id); tablet.set_index_id(index.index_id); tablet.set_tablet_id(tablet_id); - if (!_streams_for_node.contains(node)) { + if (!_streams_for_node->contains(node)) { new_backends.insert(node); } _tablets_for_node[node].emplace(tablet_id, tablet); @@ -111,11 +111,9 @@ Status VTabletWriterV2::_incremental_open_streams( } } } - for (int64_t node_id : new_backends) { - auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, _backend_id, node_id, _stream_per_node, _num_local_sink); - RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams)); - _streams_for_node[node_id] = load_streams; + for (int64_t dst_id : new_backends) { + auto streams = _streams_for_node->get_or_create(dst_id); + RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); } return Status::OK(); } @@ -242,6 +240,8 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { } else { _delta_writer_for_tablet = std::make_shared(_load_id); } + _streams_for_node = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_id, _backend_id, _stream_per_node, _num_local_sink); return Status::OK(); } @@ -253,23 +253,21 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(_build_tablet_node_mapping()); - RETURN_IF_ERROR(_open_streams(_backend_id)); + RETURN_IF_ERROR(_open_streams()); RETURN_IF_ERROR(_init_row_distribution()); return Status::OK(); } -Status VTabletWriterV2::_open_streams(int64_t src_id) { +Status VTabletWriterV2::_open_streams() { for (auto& [dst_id, _] : _tablets_for_node) { - auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, src_id, dst_id, _stream_per_node, _num_local_sink); + auto streams = _streams_for_node->get_or_create(dst_id); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); - _streams_for_node[dst_id] = streams; } return Status::OK(); } -Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& streams) { +Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& streams) { const auto* node_info = _nodes_info->find_node(dst_id); DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null", { node_info = nullptr; }); @@ -278,14 +276,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& st } auto idle_timeout_ms = _state->execution_timeout() * 1000; // get tablet schema from each backend only in the 1st stream - for (auto& stream : streams.streams() | std::ranges::views::take(1)) { + for (auto& stream : streams | std::ranges::views::take(1)) { const std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, _state->enable_profile())); } // for the rest streams, open without getting tablet schema - for (auto& stream : streams.streams() | std::ranges::views::drop(1)) { + for (auto& stream : streams | std::ranges::views::drop(1)) { RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, {}, _total_streams, idle_timeout_ms, _state->enable_profile())); @@ -360,7 +358,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, tablet.set_index_id(index_id); tablet.set_tablet_id(tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); - streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index)); + streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index)); RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); } _stream_index = (_stream_index + 1) % _stream_per_node; @@ -469,11 +467,13 @@ Status VTabletWriterV2::_cancel(Status status) { _delta_writer_for_tablet->cancel(status); _delta_writer_for_tablet.reset(); } - for (const auto& [_, streams] : _streams_for_node) { - for (const auto& stream : streams->streams()) { - stream->cancel(status); - } - streams->release(); + if (_streams_for_node) { + _streams_for_node->for_each([status](int64_t dst_id, const Streams& streams) { + for (auto& stream : streams) { + stream->cancel(status); + } + }); + _streams_for_node->release(); } return Status::OK(); } @@ -527,87 +527,83 @@ Status VTabletWriterV2::close(Status exec_status) { COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); - // defer stream release to prevent memory leak - Defer defer([&] { - for (const auto& [_, streams] : _streams_for_node) { - streams->release(); - } - _streams_for_node.clear(); - }); - + // close DeltaWriters { SCOPED_TIMER(_close_writer_timer); // close all delta writers if this is the last user - RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile)); + auto st = _delta_writer_for_tablet->close(_profile); _delta_writer_for_tablet.reset(); - } - - { - // send CLOSE_LOAD to all streams, return ERROR if any - for (const auto& [_, streams] : _streams_for_node) { - RETURN_IF_ERROR(_close_load(streams->streams())); + if (!st.ok()) { + RETURN_IF_ERROR(_cancel(st)); } } - { + _calc_tablets_to_commit(); + const bool is_last_sink = _streams_for_node->release(); + LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink + << ", load_id=" << print_id(_load_id); + + // send CLOSE_LOAD and close_wait on all streams + if (is_last_sink) { + RETURN_IF_ERROR(_streams_for_node->close_load()); SCOPED_TIMER(_close_load_timer); - for (const auto& [_, streams] : _streams_for_node) { - for (const auto& stream : streams->streams()) { - int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); - } - } + RETURN_IF_ERROR(_streams_for_node->for_each_st( + [this](int64_t dst_id, const Streams& streams) -> Status { + for (auto& stream : streams) { + int64_t remain_ms = + static_cast(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" + << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); + } + return Status::OK(); + })); } - std::unordered_map failed_tablets; + // calculate and submit commit info + if (is_last_sink) { + std::unordered_map failed_tablets; + std::unordered_map failed_reason; + std::vector tablet_commit_infos; - std::vector tablet_commit_infos; - for (const auto& [node_id, streams] : _streams_for_node) { - for (const auto& stream : streams->streams()) { + _streams_for_node->for_each([&](int64_t dst_id, const Streams& streams) { std::unordered_set known_tablets; - for (auto [tablet_id, _] : stream->failed_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; + for (const auto& stream : streams) { + for (auto [tablet_id, reason] : stream->failed_tablets()) { + if (known_tablets.contains(tablet_id)) { + continue; + } + known_tablets.insert(tablet_id); + failed_tablets[tablet_id]++; + failed_reason[tablet_id] = reason; } - known_tablets.insert(tablet_id); - failed_tablets[tablet_id]++; - } - for (auto tablet_id : stream->success_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; + for (auto tablet_id : stream->success_tablets()) { + if (known_tablets.contains(tablet_id)) { + continue; + } + known_tablets.insert(tablet_id); + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_id; + commit_info.backendId = dst_id; + tablet_commit_infos.emplace_back(std::move(commit_info)); } - known_tablets.insert(tablet_id); - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_id; - commit_info.backendId = node_id; - tablet_commit_infos.emplace_back(std::move(commit_info)); } - } - } - for (auto [tablet_id, replicas] : failed_tablets) { - if (replicas <= (_num_replicas - 1) / 2) { - continue; - } - auto backends = _location->find_tablet(tablet_id)->node_ids; - for (auto& backend_id : backends) { - for (const auto& stream : _streams_for_node[backend_id]->streams()) { - const auto& failed_tablets = stream->failed_tablets(); - if (failed_tablets.contains(tablet_id)) { - return failed_tablets.at(tablet_id); - } + }); + + for (auto [tablet_id, replicas] : failed_tablets) { + if (replicas > (_num_replicas - 1) / 2) { + return failed_reason.at(tablet_id); } } - DCHECK(false) << "failed tablet " << tablet_id << " should have failed reason"; + _state->tablet_commit_infos().insert( + _state->tablet_commit_infos().end(), + std::make_move_iterator(tablet_commit_infos.begin()), + std::make_move_iterator(tablet_commit_infos.end())); } - _state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(), - std::make_move_iterator(tablet_commit_infos.begin()), - std::make_move_iterator(tablet_commit_infos.end())); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() + @@ -629,18 +625,28 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } -Status VTabletWriterV2::_close_load(const Streams& streams) { - auto node_id = streams[0]->dst_id(); - std::vector tablets_to_commit; - for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) { - if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { - tablets_to_commit.push_back(tablet); +void VTabletWriterV2::_calc_tablets_to_commit() { + for (const auto& [dst_id, tablets] : _tablets_for_node) { + std::vector tablets_to_commit; + std::vector partition_ids; + for (const auto& [tablet_id, tablet] : tablets) { + if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { + if (VLOG_DEBUG_IS_ON) { + partition_ids.push_back(tablet.partition_id()); + } + tablets_to_commit.push_back(tablet); + } } + if (VLOG_DEBUG_IS_ON) { + std::string msg("close load partitions: "); + msg.reserve(partition_ids.size() * 7); + for (auto v : partition_ids) { + msg.append(std::to_string(v) + ", "); + } + LOG(WARNING) << msg; + } + _streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit); } - for (const auto& stream : streams) { - RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); - } - return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 460b3acc33fc4e..7785733bf4a691 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -69,7 +69,7 @@ namespace doris { class DeltaWriterV2; class LoadStreamStub; -class LoadStreams; +class LoadStreamMap; class ObjectPool; class RowDescriptor; class RuntimeState; @@ -121,9 +121,9 @@ class VTabletWriterV2 final : public AsyncResultWriter { Status _init(RuntimeState* state, RuntimeProfile* profile); - Status _open_streams(int64_t src_id); + Status _open_streams(); - Status _open_streams_to_backend(int64_t dst_id, LoadStreams& streams); + Status _open_streams_to_backend(int64_t dst_id, Streams& streams); Status _incremental_open_streams(const std::vector& partitions); @@ -140,7 +140,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, Streams& streams); - Status _close_load(const Streams& streams); + void _calc_tablets_to_commit(); Status _cancel(Status status); @@ -217,7 +217,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { std::unordered_map> _tablets_for_node; std::unordered_map> _indexes_from_node; - std::unordered_map> _streams_for_node; + std::shared_ptr _streams_for_node; size_t _stream_index = 0; std::shared_ptr _delta_writer_for_tablet; diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp index 7e5bdd350f5b37..ad6e496c56f60b 100644 --- a/be/test/io/fs/stream_sink_file_writer_test.cpp +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -51,7 +51,9 @@ static std::atomic g_num_request; class StreamSinkFileWriterTest : public testing::Test { class MockStreamStub : public LoadStreamStub { public: - MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id, 1) {}; + MockStreamStub(PUniqueId load_id, int64_t src_id) + : LoadStreamStub(load_id, src_id, std::make_shared(), + std::make_shared()) {}; virtual ~MockStreamStub() = default; diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp index 24da3bb6999f36..e576db3bdaa5f5 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -32,20 +32,29 @@ TEST_F(LoadStreamStubPoolTest, test) { LoadStreamStubPool pool; int64_t src_id = 100; PUniqueId load_id; - load_id.set_hi(1); + load_id.set_lo(1); load_id.set_hi(2); - auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1); - auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1); - auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1); + PUniqueId load_id2; + load_id2.set_lo(2); + load_id2.set_hi(1); + auto streams_for_node1 = pool.get_or_create(load_id, src_id, 5, 2); + auto streams_for_node2 = pool.get_or_create(load_id, src_id, 5, 2); + EXPECT_EQ(1, pool.size()); + auto streams_for_node3 = pool.get_or_create(load_id2, src_id, 8, 1); EXPECT_EQ(2, pool.size()); - EXPECT_EQ(1, pool.templates_size()); - EXPECT_EQ(streams1, streams3); - EXPECT_NE(streams1, streams2); - streams1->release(); - streams2->release(); - streams3->release(); + EXPECT_EQ(streams_for_node1, streams_for_node2); + EXPECT_NE(streams_for_node1, streams_for_node3); + + EXPECT_EQ(5, streams_for_node1->get_or_create(101)->size()); + EXPECT_EQ(5, streams_for_node2->get_or_create(102)->size()); + EXPECT_EQ(8, streams_for_node3->get_or_create(101)->size()); + + EXPECT_TRUE(streams_for_node3->release()); + EXPECT_EQ(1, pool.size()); + EXPECT_FALSE(streams_for_node1->release()); + EXPECT_EQ(1, pool.size()); + EXPECT_TRUE(streams_for_node2->release()); EXPECT_EQ(0, pool.size()); - EXPECT_EQ(0, pool.templates_size()); } } // namespace doris