diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 8806ff2900f2ad..306ad94099f904 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -34,7 +34,7 @@ #include "util/debug_util.h" #include "util/time.h" #include "vec/sink/delta_writer_v2_pool.h" -#include "vec/sink/load_stream_stub_pool.h" +#include "vec/sink/load_stream_map_pool.h" namespace doris { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 1ba35d6b20018a..e24727d845ca74 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -82,7 +82,7 @@ class RuntimeQueryStatiticsMgr; class TMasterInfo; class LoadChannelMgr; class LoadStreamMgr; -class LoadStreamStubPool; +class LoadStreamMapPool; class StreamLoadExecutor; class RoutineLoadTaskExecutor; class SmallFileMgr; @@ -264,7 +264,7 @@ class ExecEnv { } #endif - LoadStreamStubPool* load_stream_stub_pool() { return _load_stream_stub_pool.get(); } + LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); } vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); } @@ -393,7 +393,7 @@ class ExecEnv { // To save meta info of external file, such as parquet footer. FileMetaCache* _file_meta_cache = nullptr; std::unique_ptr _memtable_memory_limiter; - std::unique_ptr _load_stream_stub_pool; + std::unique_ptr _load_stream_map_pool; std::unique_ptr _delta_writer_v2_pool; std::shared_ptr _wal_manager; DNSCache* _dns_cache = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index c44605dc845fa8..a8b2131d883ad0 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -104,7 +104,7 @@ #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/vdata_stream_mgr.h" #include "vec/sink/delta_writer_v2_pool.h" -#include "vec/sink/load_stream_stub_pool.h" +#include "vec/sink/load_stream_map_pool.h" #include "vec/spill/spill_stream_manager.h" #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ @@ -245,7 +245,7 @@ Status ExecEnv::_init(const std::vector& store_paths, _block_spill_mgr = new BlockSpillManager(store_paths); _group_commit_mgr = new GroupCommitMgr(this); _memtable_memory_limiter = std::make_unique(); - _load_stream_stub_pool = std::make_unique(); + _load_stream_map_pool = std::make_unique(); _delta_writer_v2_pool = std::make_unique(); _file_cache_open_fd_cache = std::make_unique(); _wal_manager = WalManager::create_shared(this, config::group_commit_wal_path); @@ -595,7 +595,7 @@ void ExecEnv::destroy() { _stream_load_executor.reset(); _memtable_memory_limiter.reset(); _delta_writer_v2_pool.reset(); - _load_stream_stub_pool.reset(); + _load_stream_map_pool.reset(); _file_cache_open_fd_cache.reset(); // StorageEngine must be destoried before _page_no_cache_mem_tracker.reset and _cache_manager destory diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp similarity index 85% rename from be/src/vec/sink/load_stream_stub_pool.cpp rename to be/src/vec/sink/load_stream_map_pool.cpp index 3eae49aff7717a..f335f05e162831 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -15,21 +15,22 @@ // specific language governing permissions and limitations // under the License. -#include "vec/sink/load_stream_stub_pool.h" +#include "vec/sink/load_stream_map_pool.h" #include "util/debug_points.h" -#include "vec/sink/load_stream_stub.h" namespace doris { class TExpr; LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, - LoadStreamStubPool* pool) + LoadStreamMapPool* pool) : _load_id(load_id), _src_id(src_id), _num_streams(num_streams), _use_cnt(num_use), - _pool(pool) { + _pool(pool), + _tablet_schema_for_index(std::make_shared()), + _enable_unique_mow_for_index(std::make_shared()) { DCHECK(num_streams > 0) << "stream num should be greater than 0"; DCHECK(num_use > 0) << "use num should be greater than 0"; } @@ -41,10 +42,9 @@ std::shared_ptr LoadStreamMap::get_or_create(int64_t dst_id) { return streams; } streams = std::make_shared(); - auto schema_map = std::make_shared(); - auto mow_map = std::make_shared(); for (int i = 0; i < _num_streams; i++) { - streams->emplace_back(new LoadStreamStub(_load_id, _src_id, schema_map, mow_map)); + streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index, + _enable_unique_mow_for_index)); } _streams_for_node[dst_id] = streams; return streams; @@ -103,11 +103,11 @@ Status LoadStreamMap::close_load() { }); } -LoadStreamStubPool::LoadStreamStubPool() = default; +LoadStreamMapPool::LoadStreamMapPool() = default; -LoadStreamStubPool::~LoadStreamStubPool() = default; -std::shared_ptr LoadStreamStubPool::get_or_create(UniqueId load_id, int64_t src_id, - int num_streams, int num_use) { +LoadStreamMapPool::~LoadStreamMapPool() = default; +std::shared_ptr LoadStreamMapPool::get_or_create(UniqueId load_id, int64_t src_id, + int num_streams, int num_use) { std::lock_guard lock(_mutex); std::shared_ptr streams = _pool[load_id]; if (streams != nullptr) { @@ -118,7 +118,7 @@ std::shared_ptr LoadStreamStubPool::get_or_create(UniqueId load_i return streams; } -void LoadStreamStubPool::erase(UniqueId load_id) { +void LoadStreamMapPool::erase(UniqueId load_id) { std::lock_guard lock(_mutex); _pool.erase(load_id); } diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_map_pool.h similarity index 91% rename from be/src/vec/sink/load_stream_stub_pool.h rename to be/src/vec/sink/load_stream_map_pool.h index 65f3bb66cd2236..aad12dba2aa4ac 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -63,19 +63,20 @@ #include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" +#include "vec/sink/load_stream_stub.h" namespace doris { class LoadStreamStub; -class LoadStreamStubPool; +class LoadStreamMapPool; using Streams = std::vector>; class LoadStreamMap { public: LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, - LoadStreamStubPool* pool); + LoadStreamMapPool* pool); std::shared_ptr get_or_create(int64_t dst_id); @@ -103,17 +104,19 @@ class LoadStreamMap { std::atomic _use_cnt; std::mutex _mutex; std::unordered_map> _streams_for_node; - LoadStreamStubPool* _pool = nullptr; + LoadStreamMapPool* _pool = nullptr; + std::shared_ptr _tablet_schema_for_index; + std::shared_ptr _enable_unique_mow_for_index; std::mutex _tablets_to_commit_mutex; std::unordered_map> _tablets_to_commit; }; -class LoadStreamStubPool { +class LoadStreamMapPool { public: - LoadStreamStubPool(); + LoadStreamMapPool(); - ~LoadStreamStubPool(); + ~LoadStreamMapPool(); std::shared_ptr get_or_create(UniqueId load_id, int64_t src_id, int num_streams, int num_use); diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 2e118dce5c1384..b4a9ac186432a3 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -250,7 +250,7 @@ Status LoadStreamStub::get_schema(const std::vector& tablets) { PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); - header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); + header.set_opcode(doris::PStreamHeader::GET_SCHEMA); std::ostringstream oss; oss << "fetching tablet schema from stream " << _stream_id << ", load id: " << print_id(_load_id) << ", tablet id:"; diff --git a/be/src/vec/sink/volap_table_sink_v2.cpp b/be/src/vec/sink/volap_table_sink_v2.cpp index a41b77ace53cab..a73ee483bd3886 100644 --- a/be/src/vec/sink/volap_table_sink_v2.cpp +++ b/be/src/vec/sink/volap_table_sink_v2.cpp @@ -31,8 +31,6 @@ #include "runtime/runtime_state.h" #include "util/doris_metrics.h" #include "vec/sink/delta_writer_v2_pool.h" -#include "vec/sink/load_stream_stub.h" -#include "vec/sink/load_stream_stub_pool.h" namespace doris { class TExpr; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 42594f13d993af..bbe00b589ee04f 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -48,8 +48,8 @@ #include "vec/core/block.h" #include "vec/sink/delta_writer_v2_pool.h" // NOLINTNEXTLINE(unused-includes) +#include "vec/sink/load_stream_map_pool.h" #include "vec/sink/load_stream_stub.h" // IWYU pragma: keep -#include "vec/sink/load_stream_stub_pool.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -96,7 +96,7 @@ Status VTabletWriterV2::_incremental_open_streams( tablet.set_partition_id(partition->id); tablet.set_index_id(index.index_id); tablet.set_tablet_id(tablet_id); - if (!_streams_for_node->contains(node)) { + if (!_load_stream_map->contains(node)) { new_backends.insert(node); } _tablets_for_node[node].emplace(tablet_id, tablet); @@ -112,7 +112,7 @@ Status VTabletWriterV2::_incremental_open_streams( } } for (int64_t dst_id : new_backends) { - auto streams = _streams_for_node->get_or_create(dst_id); + auto streams = _load_stream_map->get_or_create(dst_id); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); } return Status::OK(); @@ -242,7 +242,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { } else { _delta_writer_for_tablet = std::make_shared(_load_id); } - _streams_for_node = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create( _load_id, _backend_id, _stream_per_node, _num_local_sink); return Status::OK(); } @@ -263,7 +263,7 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { Status VTabletWriterV2::_open_streams() { for (auto& [dst_id, _] : _tablets_for_node) { - auto streams = _streams_for_node->get_or_create(dst_id); + auto streams = _load_stream_map->get_or_create(dst_id); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); } return Status::OK(); @@ -361,7 +361,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, tablet.set_tablet_id(tablet_id); VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); - streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index)); + streams.emplace_back(_load_stream_map->at(node_id)->at(_stream_index)); RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); } _stream_index = (_stream_index + 1) % _stream_per_node; @@ -470,13 +470,13 @@ Status VTabletWriterV2::_cancel(Status status) { _delta_writer_for_tablet->cancel(status); _delta_writer_for_tablet.reset(); } - if (_streams_for_node) { - _streams_for_node->for_each([status](int64_t dst_id, const Streams& streams) { + if (_load_stream_map) { + _load_stream_map->for_each([status](int64_t dst_id, const Streams& streams) { for (auto& stream : streams) { stream->cancel(status); } }); - _streams_for_node->release(); + _load_stream_map->release(); } return Status::OK(); } @@ -542,29 +542,28 @@ Status VTabletWriterV2::close(Status exec_status) { } _calc_tablets_to_commit(); - const bool is_last_sink = _streams_for_node->release(); + const bool is_last_sink = _load_stream_map->release(); LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink << ", load_id=" << print_id(_load_id); // send CLOSE_LOAD and close_wait on all streams if (is_last_sink) { - RETURN_IF_ERROR(_streams_for_node->close_load()); + RETURN_IF_ERROR(_load_stream_map->close_load()); SCOPED_TIMER(_close_load_timer); - RETURN_IF_ERROR(_streams_for_node->for_each_st( - [this](int64_t dst_id, const Streams& streams) -> Status { - for (auto& stream : streams) { - int64_t remain_ms = - static_cast(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); - } - return Status::OK(); - })); + RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id, + const Streams& streams) -> Status { + for (auto& stream : streams) { + int64_t remain_ms = static_cast(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" + << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); + } + return Status::OK(); + })); } // calculate and submit commit info @@ -573,7 +572,7 @@ Status VTabletWriterV2::close(Status exec_status) { std::unordered_map failed_reason; std::vector tablet_commit_infos; - _streams_for_node->for_each([&](int64_t dst_id, const Streams& streams) { + _load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) { std::unordered_set known_tablets; for (const auto& stream : streams) { for (auto [tablet_id, reason] : stream->failed_tablets()) { @@ -648,7 +647,7 @@ void VTabletWriterV2::_calc_tablets_to_commit() { } LOG(WARNING) << msg; } - _streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit); + _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit); } } diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 7785733bf4a691..c04cff15cf4b37 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -217,7 +217,7 @@ class VTabletWriterV2 final : public AsyncResultWriter { std::unordered_map> _tablets_for_node; std::unordered_map> _indexes_from_node; - std::shared_ptr _streams_for_node; + std::shared_ptr _load_stream_map; size_t _stream_index = 0; std::shared_ptr _delta_writer_for_tablet; diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_map_test.cpp similarity index 88% rename from be/test/vec/exec/load_stream_stub_pool_test.cpp rename to be/test/vec/exec/load_stream_stub_map_test.cpp index e576db3bdaa5f5..5f8743340ac232 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_map_test.cpp @@ -14,22 +14,21 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -#include "vec/sink/load_stream_stub_pool.h" - #include +#include "vec/sink/load_stream_map_pool.h" #include "vec/sink/load_stream_stub.h" namespace doris { -class LoadStreamStubPoolTest : public testing::Test { +class LoadStreamMapPoolTest : public testing::Test { public: - LoadStreamStubPoolTest() = default; - virtual ~LoadStreamStubPoolTest() = default; + LoadStreamMapPoolTest() = default; + virtual ~LoadStreamMapPoolTest() = default; }; -TEST_F(LoadStreamStubPoolTest, test) { - LoadStreamStubPool pool; +TEST_F(LoadStreamMapPoolTest, test) { + LoadStreamMapPool pool; int64_t src_id = 100; PUniqueId load_id; load_id.set_lo(1);