From 43abbe58c41e28920e0492402c467a3ff80e1444 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Tue, 22 Oct 2024 14:35:16 +0800 Subject: [PATCH] [fix](move-memtable) abstract multi-streams to one logical stream (#42039) Currently, an upstream BE (sink_v2) will open multiple streams to a downstream BE (load_stream). If any of the streams fails, the use_cnt on the downstream BE will be messed up. The load_stream will not report any success tablets to the sink_v2 since in its view there are still unfinished streams. So fault tolerance when open streams is not meaningful in practical, and may cause data lost. i.e. Upstream think there is still working streams to transfer data, but downstream does not report any commit info. This PR removes fault tolerance when open multiple streams to the same backend. If any of the open fails, the upstream sink_v2 should mark the downstream BE as failed replicas. --- be/src/vec/sink/load_stream_map_pool.cpp | 50 ++---- be/src/vec/sink/load_stream_map_pool.h | 12 +- be/src/vec/sink/load_stream_stub.cpp | 66 +++++++- be/src/vec/sink/load_stream_stub.h | 69 ++++++++ be/src/vec/sink/writer/vtablet_writer_v2.cpp | 160 ++++++++---------- be/src/vec/sink/writer/vtablet_writer_v2.h | 8 +- be/test/vec/sink/vtablet_writer_v2_test.cpp | 7 +- .../test_multi_replica_fault_injection.groovy | 2 - 8 files changed, 234 insertions(+), 140 deletions(-) diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index e8407f4730d24f..d6dddcc96dc2fe 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -35,22 +35,20 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, DCHECK(num_use > 0) << "use num should be greater than 0"; } -std::shared_ptr LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) { +std::shared_ptr LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) { std::lock_guard lock(_mutex); - std::shared_ptr streams = _streams_for_node[dst_id]; + std::shared_ptr streams = _streams_for_node[dst_id]; if (streams != nullptr) { return streams; } - streams = std::make_shared(); - for (int i = 0; i < _num_streams; i++) { - streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index, - _enable_unique_mow_for_index, incremental)); - } + streams = std::make_shared(_num_streams, _load_id, _src_id, + _tablet_schema_for_index, + _enable_unique_mow_for_index, incremental); _streams_for_node[dst_id] = streams; return streams; } -std::shared_ptr LoadStreamMap::at(int64_t dst_id) { +std::shared_ptr LoadStreamMap::at(int64_t dst_id) { std::lock_guard lock(_mutex); return _streams_for_node.at(dst_id); } @@ -60,7 +58,7 @@ bool LoadStreamMap::contains(int64_t dst_id) { return _streams_for_node.contains(dst_id); } -void LoadStreamMap::for_each(std::function fn) { +void LoadStreamMap::for_each(std::function fn) { decltype(_streams_for_node) snapshot; { std::lock_guard lock(_mutex); @@ -71,7 +69,7 @@ void LoadStreamMap::for_each(std::function fn) { } } -Status LoadStreamMap::for_each_st(std::function fn) { +Status LoadStreamMap::for_each_st(std::function fn) { decltype(_streams_for_node) snapshot; { std::lock_guard lock(_mutex); @@ -108,7 +106,10 @@ bool LoadStreamMap::release() { } void LoadStreamMap::close_load(bool incremental) { - auto st = for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { + for (auto& [dst_id, streams] : _streams_for_node) { + if (streams->is_incremental()) { + continue; + } std::vector tablets_to_commit; const auto& tablets = _tablets_to_commit[dst_id]; tablets_to_commit.reserve(tablets.size()); @@ -116,30 +117,11 @@ void LoadStreamMap::close_load(bool incremental) { tablets_to_commit.push_back(tablet); tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]); } - Status status = Status::OK(); - bool first = true; - for (auto& stream : streams) { - if (stream->is_incremental() != incremental) { - continue; - } - if (first) { - auto st = stream->close_load(tablets_to_commit); - if (!st.ok() && status.ok()) { - status = st; - } - first = false; - } else { - auto st = stream->close_load({}); - if (!st.ok() && status.ok()) { - status = st; - } - } + auto st = streams->close_load(tablets_to_commit); + if (!st.ok()) { + LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental") + << " streams failed: " << st << ", load_id=" << _load_id; } - return status; - }); - if (!st.ok()) { - LOG(WARNING) << "close_load for " << (incremental ? "incremental" : "non-incremental") - << " streams failed: " << st << ", load_id=" << _load_id; } } diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index e5b66aaf9c9060..602f1711a94a5e 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -71,22 +71,20 @@ class LoadStreamStub; class LoadStreamMapPool; -using Streams = std::vector>; - class LoadStreamMap { public: LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, LoadStreamMapPool* pool); - std::shared_ptr get_or_create(int64_t dst_id, bool incremental = false); + std::shared_ptr get_or_create(int64_t dst_id, bool incremental = false); - std::shared_ptr at(int64_t dst_id); + std::shared_ptr at(int64_t dst_id); bool contains(int64_t dst_id); - void for_each(std::function fn); + void for_each(std::function fn); - Status for_each_st(std::function fn); + Status for_each_st(std::function fn); void save_tablets_to_commit(int64_t dst_id, const std::vector& tablets_to_commit); @@ -107,7 +105,7 @@ class LoadStreamMap { const int _num_streams; std::atomic _use_cnt; std::mutex _mutex; - std::unordered_map> _streams_for_node; + std::unordered_map> _streams_for_node; LoadStreamMapPool* _pool = nullptr; std::shared_ptr _tablet_schema_for_index; std::shared_ptr _enable_unique_mow_for_index; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 1d13ca4b90302a..672a0be44f7d5b 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -207,7 +207,8 @@ Status LoadStreamStub::open(BrpcClientCache* client_cache, LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" << node_info.brpc_port << ", " << *this; _is_open.store(true); - return Status::OK(); + _status = Status::OK(); + return _status; } // APPEND_DATA @@ -504,4 +505,67 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub) return ostr; } +Status LoadStreamStubs::open(BrpcClientCache* client_cache, + const NodeInfo& node_info, int64_t txn_id, + const OlapTableSchemaParam& schema, + const std::vector& tablets_for_schema, int total_streams, + int64_t idle_timeout_ms, bool enable_profile) { + bool get_schema = true; + auto status = Status::OK(); + for (auto& stream : _streams) { + Status st; + if (get_schema) { + st = stream->open(client_cache, node_info, txn_id, schema, tablets_for_schema, + total_streams, idle_timeout_ms, enable_profile); + } else { + st = stream->open(client_cache, node_info, txn_id, schema, {}, total_streams, + idle_timeout_ms, enable_profile); + } + if (st.ok()) { + get_schema = false; + } else { + LOG(WARNING) << "open stream failed: " << st << "; stream: " << *stream; + status = st; + // no break here to try get schema from the rest streams + } + } + // only mark open when all streams open success + _open_success.store(status.ok()); + // cancel all streams if open failed + if (!status.ok()) { + cancel(status); + } + return status; +} + +Status LoadStreamStubs::close_load(const std::vector& tablets_to_commit) { + if (!_open_success.load()) { + return Status::InternalError("streams not open"); + } + bool first = true; + auto status = Status::OK(); + for (auto& stream : _streams) { + Status st; + if (first) { + st = stream->close_load(tablets_to_commit); + first = false; + } else { + st = stream->close_load({}); + } + if (!st.ok()) { + LOG(WARNING) << "close_load failed: " << st << "; stream: " << *stream; + } + } + return status; +} + +Status LoadStreamStubs::close_wait(RuntimeState* state, int64_t timeout_ms) { + MonotonicStopWatch watch; + watch.start(); + for (auto& stream : _streams) { + RETURN_IF_ERROR(stream->close_wait(state, timeout_ms - watch.elapsed_time() / 1000 / 1000)); + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 223babb42e3f5b..241d7e612cea89 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -266,4 +266,73 @@ class LoadStreamStub : public std::enable_shared_from_this { bool _is_incremental = false; }; +// a collection of LoadStreams connect to the same node +class LoadStreamStubs { +public: + LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id, + std::shared_ptr schema_map, + std::shared_ptr mow_map, bool incremental = false) + : _is_incremental(incremental) { + _streams.reserve(num_streams); + for (size_t i = 0; i < num_streams; i++) { + _streams.emplace_back( + new LoadStreamStub(load_id, src_id, schema_map, mow_map, incremental)); + } + } + + Status open(BrpcClientCache* client_cache, const NodeInfo& node_info, + int64_t txn_id, const OlapTableSchemaParam& schema, + const std::vector& tablets_for_schema, int total_streams, + int64_t idle_timeout_ms, bool enable_profile); + + bool is_incremental() const { return _is_incremental; } + + size_t size() const { return _streams.size(); } + + // for UT only + void mark_open() { _open_success.store(true); } + + std::shared_ptr select_one_stream() { + if (!_open_success.load()) { + return nullptr; + } + size_t i = _select_index.fetch_add(1); + return _streams[i % _streams.size()]; + } + + void cancel(Status reason) { + for (auto& stream : _streams) { + stream->cancel(reason); + } + } + + Status close_load(const std::vector& tablets_to_commit); + + Status close_wait(RuntimeState* state, int64_t timeout_ms = 0); + + std::unordered_set success_tablets() { + std::unordered_set s; + for (auto& stream : _streams) { + auto v = stream->success_tablets(); + std::copy(v.begin(), v.end(), std::inserter(s, s.end())); + } + return s; + } + + std::unordered_map failed_tablets() { + std::unordered_map m; + for (auto& stream : _streams) { + auto v = stream->failed_tablets(); + m.insert(v.begin(), v.end()); + } + return m; + } + +private: + std::vector> _streams; + std::atomic _open_success = false; + std::atomic _select_index = 0; + const bool _is_incremental; +}; + } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 16c11b1cf4258b..c693e20c3a82f2 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -269,6 +269,8 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { Status VTabletWriterV2::_open_streams() { bool fault_injection_skip_be = true; + bool any_backend = false; + bool any_success = false; for (auto& [dst_id, _] : _tablets_for_node) { auto streams = _load_stream_map->get_or_create(dst_id); DBUG_EXECUTE_IF("VTabletWriterV2._open_streams.skip_one_backend", { @@ -277,12 +279,17 @@ Status VTabletWriterV2::_open_streams() { continue; } }); - RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); + auto st = _open_streams_to_backend(dst_id, *streams); + any_backend = true; + any_success = any_success || st.ok(); + } + if (any_backend && !any_success) { + return Status::InternalError("failed to open streams to any BE"); } return Status::OK(); } -Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& streams) { +Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams) { const auto* node_info = _nodes_info->find_node(dst_id); DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null", { node_info = nullptr; }); @@ -293,26 +300,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream std::vector& tablets_for_schema = _indexes_from_node[node_info->id]; DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams", { tablets_for_schema.clear(); }); - int fault_injection_skip_cnt = 0; - for (auto& stream : streams) { - DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure", { - if (fault_injection_skip_cnt < 1) { - fault_injection_skip_cnt++; - continue; - } - }); - auto st = stream->open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, - _txn_id, *_schema, tablets_for_schema, _total_streams, - idle_timeout_ms, _state->enable_profile()); - if (st.ok()) { - // get tablet schema from each backend only in the 1st stream - tablets_for_schema.clear(); - } else { - LOG(WARNING) << "failed to open stream to backend " << dst_id - << ", load_id=" << print_id(_load_id); - } + auto st = streams.open(_state->exec_env()->brpc_streaming_client_cache(), *node_info, _txn_id, + *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, + _state->enable_profile()); + if (!st.ok()) { + LOG(WARNING) << "failed to open stream to backend " << dst_id + << ", load_id=" << print_id(_load_id) << ", err=" << st; } - return Status::OK(); + return st; } Status VTabletWriterV2::_build_tablet_node_mapping() { @@ -375,7 +370,7 @@ void VTabletWriterV2::_generate_rows_for_tablet(std::vector& r } Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams) { + std::vector>& streams) { const auto* location = _location->find_tablet(tablet_id); DBUG_EXECUTE_IF("VTabletWriterV2._select_streams.location_null", { location = nullptr; }); if (location == nullptr) { @@ -388,13 +383,16 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, tablet.set_tablet_id(tablet_id); VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); - auto stream = _load_stream_map->at(node_id)->at(_stream_index); - for (int i = 1; i < _stream_per_node && !stream->is_open(); i++) { - stream = _load_stream_map->at(node_id)->at((_stream_index + i) % _stream_per_node); + auto stream = _load_stream_map->at(node_id)->select_one_stream(); + if (stream == nullptr) { + continue; } streams.emplace_back(std::move(stream)); } - _stream_index = (_stream_index + 1) % _stream_per_node; + if (streams.size() <= location->node_ids.size() / 2) { + return Status::InternalError("not enough streams {}/{}", streams.size(), + location->node_ids.size()); + } Status st; for (auto& stream : streams) { st = stream->wait_for_schema(partition_id, index_id, tablet_id); @@ -458,9 +456,10 @@ Status VTabletWriterV2::write(Block& input_block) { Status VTabletWriterV2::_write_memtable(std::shared_ptr block, int64_t tablet_id, const Rows& rows) { + auto st = Status::OK(); auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() { - Streams streams; - auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams); + std::vector> streams; + st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams); if (!st.ok()) [[unlikely]] { LOG(WARNING) << "select stream failed, " << st << ", load_id=" << print_id(_load_id); return std::unique_ptr(nullptr); @@ -487,7 +486,8 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block } DBUG_EXECUTE_IF("VTabletWriterV2._write_memtable.index_not_found", { index_not_found = true; }); - if (index_not_found) { + if (index_not_found) [[unlikely]] { + st = Status::InternalError("no index {} in schema", rows.index_id); LOG(WARNING) << "index " << rows.index_id << " not found in schema, load_id=" << print_id(_load_id); return std::unique_ptr(nullptr); @@ -496,15 +496,15 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr block }); if (delta_writer == nullptr) { LOG(WARNING) << "failed to open DeltaWriter for tablet " << tablet_id - << ", load_id=" << print_id(_load_id); - return Status::InternalError("failed to open DeltaWriter for tablet {}", tablet_id); + << ", load_id=" << print_id(_load_id) << ", err: " << st; + return Status::InternalError("failed to open DeltaWriter {}: {}", tablet_id, st.msg()); } { SCOPED_TIMER(_wait_mem_limit_timer); ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(); } SCOPED_TIMER(_write_memtable_timer); - auto st = delta_writer->write(block.get(), rows.row_idxes); + st = delta_writer->write(block.get(), rows.row_idxes); return st; } @@ -517,11 +517,8 @@ void VTabletWriterV2::_cancel(Status status) { _delta_writer_for_tablet.reset(); } if (_load_stream_map) { - _load_stream_map->for_each([status](int64_t dst_id, const Streams& streams) { - for (auto& stream : streams) { - stream->cancel(status); - } - }); + _load_stream_map->for_each( + [status](int64_t dst_id, LoadStreamStubs& streams) { streams.cancel(status); }); _load_stream_map->release(); } } @@ -624,17 +621,14 @@ Status VTabletWriterV2::close(Status exec_status) { DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", { auto streams = _load_stream_map->at(_tablets_for_node.begin()->first); int64_t tablet_id = -1; - for (auto& stream : *streams) { - const auto& tablets = stream->success_tablets(); - if (tablets.size() > 0) { - tablet_id = tablets[0]; - break; - } + for (auto tablet : streams->success_tablets()) { + tablet_id = tablet; + break; } if (tablet_id != -1) { LOG(INFO) << "fault injection: adding failed tablet_id: " << tablet_id; - streams->front()->add_failed_tablet(tablet_id, - Status::InternalError("fault injection")); + streams->select_one_stream()->add_failed_tablet( + tablet_id, Status::InternalError("fault injection")); } else { LOG(INFO) << "fault injection: failed to inject failed tablet_id"; } @@ -672,26 +666,24 @@ Status VTabletWriterV2::close(Status exec_status) { void VTabletWriterV2::_close_wait(bool incremental) { SCOPED_TIMER(_close_load_timer); auto st = _load_stream_map->for_each_st( - [this, incremental](int64_t dst_id, const Streams& streams) -> Status { - Status status = Status::OK(); - for (auto& stream : streams) { - if (stream->is_incremental() != incremental) { - continue; - } - int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - auto st = stream->close_wait(_state, remain_ms); - if (!st.ok() && status.ok()) { - status = st; - } + [this, incremental](int64_t dst_id, LoadStreamStubs& streams) -> Status { + if (streams.is_incremental() != incremental) { + return Status::OK(); + } + int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + DBUG_EXECUTE_IF("VTabletWriterV2._close_wait.load_timeout", { remain_ms = 0; }); + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" + << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); } - return status; + auto st = streams.close_wait(_state, remain_ms); + if (!st.ok()) { + LOG(WARNING) << "close_wait timeout on streams to dst_id=" << dst_id + << ", load_id=" << print_id(_load_id) << ": " << st; + } + return st; }); if (!st.ok()) { LOG(WARNING) << "close_wait failed: " << st << ", load_id=" << print_id(_load_id); @@ -730,31 +722,23 @@ Status VTabletWriterV2::_create_commit_info(std::vector& tabl int num_replicas) { std::unordered_map failed_tablets; std::unordered_map failed_reason; - load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) { - std::unordered_set known_tablets; - for (const auto& stream : streams) { - LOG(INFO) << "stream " << stream->stream_id() - << " success tablets: " << stream->success_tablets().size() - << ", failed tablets: " << stream->failed_tablets().size(); - for (auto [tablet_id, reason] : stream->failed_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; - } - known_tablets.insert(tablet_id); - failed_tablets[tablet_id]++; - failed_reason[tablet_id] = reason; - } - for (auto tablet_id : stream->success_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; - } - known_tablets.insert(tablet_id); - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_id; - commit_info.backendId = dst_id; - tablet_commit_infos.emplace_back(std::move(commit_info)); - } + load_stream_map->for_each([&](int64_t dst_id, LoadStreamStubs& streams) { + size_t num_success_tablets = 0; + size_t num_failed_tablets = 0; + for (auto [tablet_id, reason] : streams.failed_tablets()) { + failed_tablets[tablet_id]++; + failed_reason[tablet_id] = reason; + num_failed_tablets++; + } + for (auto tablet_id : streams.success_tablets()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_id; + commit_info.backendId = dst_id; + tablet_commit_infos.emplace_back(std::move(commit_info)); + num_success_tablets++; } + LOG(INFO) << "streams to dst_id: " << dst_id << ", success tablets: " << num_success_tablets + << ", failed tablets: " << num_failed_tablets; }); for (auto [tablet_id, replicas] : failed_tablets) { diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index f65e0c8f3cd426..b50044ede938c4 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -69,6 +69,7 @@ namespace doris { class DeltaWriterV2; class LoadStreamStub; +class LoadStreamStubs; class LoadStreamMap; class ObjectPool; class RowDescriptor; @@ -85,8 +86,6 @@ class OlapTabletFinder; class VTabletWriterV2; class DeltaWriterV2Map; -using Streams = std::vector>; - struct Rows { int64_t partition_id; int64_t index_id; @@ -128,7 +127,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { Status _open_streams(); - Status _open_streams_to_backend(int64_t dst_id, Streams& streams); + Status _open_streams_to_backend(int64_t dst_id, LoadStreamStubs& streams); Status _incremental_open_streams(const std::vector& partitions); @@ -143,7 +142,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { const Rows& rows); Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, - Streams& streams); + std::vector>& streams); void _calc_tablets_to_commit(); @@ -226,7 +225,6 @@ class VTabletWriterV2 final : public AsyncResultWriter { std::shared_ptr _load_stream_map; - size_t _stream_index = 0; std::shared_ptr _delta_writer_for_tablet; VRowDistribution _row_distribution; diff --git a/be/test/vec/sink/vtablet_writer_v2_test.cpp b/be/test/vec/sink/vtablet_writer_v2_test.cpp index 6289896c75fbb5..67dc9d089ab482 100644 --- a/be/test/vec/sink/vtablet_writer_v2_test.cpp +++ b/be/test/vec/sink/vtablet_writer_v2_test.cpp @@ -37,12 +37,13 @@ const int64_t src_id = 1000; static void add_stream(std::shared_ptr load_stream_map, int64_t node_id, std::vector success_tablets, std::unordered_map failed_tablets) { - auto stub = load_stream_map->get_or_create(node_id); + auto streams = load_stream_map->get_or_create(node_id); + streams->mark_open(); for (const auto& tablet_id : success_tablets) { - stub->at(0)->add_success_tablet(tablet_id); + streams->select_one_stream()->add_success_tablet(tablet_id); } for (const auto& [tablet_id, reason] : failed_tablets) { - stub->at(0)->add_failed_tablet(tablet_id, reason); + streams->select_one_stream()->add_failed_tablet(tablet_id, reason); } } diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 33f7e28dbc930a..2f6afd5ca6925b 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -97,8 +97,6 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") // test segment num check when LoadStreamStub missed tail segments load_with_injection("LoadStreamStub.only_send_segment_0", "segment num mismatch") - // test 1st stream to each backend failure - load_with_injection("VTabletWriterV2._open_streams_to_backend.one_stream_open_failure", "success") // test one backend open failure load_with_injection("VTabletWriterV2._open_streams.skip_one_backend", "success") sql """ set enable_memtable_on_sink_node=false """