Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "util/debug_util.h"
#include "util/time.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/load_stream_map_pool.h"

namespace doris {

Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class RuntimeQueryStatiticsMgr;
class TMasterInfo;
class LoadChannelMgr;
class LoadStreamMgr;
class LoadStreamStubPool;
class LoadStreamMapPool;
class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class SmallFileMgr;
Expand Down Expand Up @@ -264,7 +264,7 @@ class ExecEnv {
}

#endif
LoadStreamStubPool* load_stream_stub_pool() { return _load_stream_stub_pool.get(); }
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }

vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return _delta_writer_v2_pool.get(); }

Expand Down Expand Up @@ -393,7 +393,7 @@ class ExecEnv {
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
std::unique_ptr<LoadStreamStubPool> _load_stream_stub_pool;
std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
DNSCache* _dns_cache = nullptr;
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/load_stream_map_pool.h"
#include "vec/spill/spill_stream_manager.h"

#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \
Expand Down Expand Up @@ -245,7 +245,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
_load_stream_stub_pool = std::make_unique<LoadStreamStubPool>();
_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
_wal_manager = WalManager::create_shared(this, config::group_commit_wal_path);
Expand Down Expand Up @@ -595,7 +595,7 @@ void ExecEnv::destroy() {
_stream_load_executor.reset();
_memtable_memory_limiter.reset();
_delta_writer_v2_pool.reset();
_load_stream_stub_pool.reset();
_load_stream_map_pool.reset();
_file_cache_open_fd_cache.reset();

// StorageEngine must be destoried before _page_no_cache_mem_tracker.reset and _cache_manager destory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
// specific language governing permissions and limitations
// under the License.

#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/load_stream_map_pool.h"

#include "util/debug_points.h"
#include "vec/sink/load_stream_stub.h"

namespace doris {
class TExpr;

LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamStubPool* pool)
LoadStreamMapPool* pool)
: _load_id(load_id),
_src_id(src_id),
_num_streams(num_streams),
_use_cnt(num_use),
_pool(pool) {
_pool(pool),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
_enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_use > 0) << "use num should be greater than 0";
}
Expand All @@ -41,10 +42,9 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
return streams;
}
streams = std::make_shared<Streams>();
auto schema_map = std::make_shared<IndexToTabletSchema>();
auto mow_map = std::make_shared<IndexToEnableMoW>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, schema_map, mow_map));
streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index,
_enable_unique_mow_for_index));
}
_streams_for_node[dst_id] = streams;
return streams;
Expand Down Expand Up @@ -103,11 +103,11 @@ Status LoadStreamMap::close_load() {
});
}

LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamMapPool::LoadStreamMapPool() = default;

LoadStreamStubPool::~LoadStreamStubPool() = default;
std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId load_id, int64_t src_id,
int num_streams, int num_use) {
LoadStreamMapPool::~LoadStreamMapPool() = default;
std::shared_ptr<LoadStreamMap> LoadStreamMapPool::get_or_create(UniqueId load_id, int64_t src_id,
int num_streams, int num_use) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
if (streams != nullptr) {
Expand All @@ -118,7 +118,7 @@ std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId load_i
return streams;
}

void LoadStreamStubPool::erase(UniqueId load_id) {
void LoadStreamMapPool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(load_id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/sink/load_stream_stub.h"

namespace doris {

class LoadStreamStub;

class LoadStreamStubPool;
class LoadStreamMapPool;

using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;

class LoadStreamMap {
public:
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use,
LoadStreamStubPool* pool);
LoadStreamMapPool* pool);

std::shared_ptr<Streams> get_or_create(int64_t dst_id);

Expand Down Expand Up @@ -103,17 +104,19 @@ class LoadStreamMap {
std::atomic<int> _use_cnt;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
LoadStreamStubPool* _pool = nullptr;
LoadStreamMapPool* _pool = nullptr;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;

std::mutex _tablets_to_commit_mutex;
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
};

class LoadStreamStubPool {
class LoadStreamMapPool {
public:
LoadStreamStubPool();
LoadStreamMapPool();

~LoadStreamStubPool();
~LoadStreamMapPool();

std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t src_id, int num_streams,
int num_use);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ Status LoadStreamStub::get_schema(const std::vector<PTabletID>& tablets) {
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
std::ostringstream oss;
oss << "fetching tablet schema from stream " << _stream_id
<< ", load id: " << print_id(_load_id) << ", tablet id:";
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/sink/volap_table_sink_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "vec/sink/delta_writer_v2_pool.h"
#include "vec/sink/load_stream_stub.h"
#include "vec/sink/load_stream_stub_pool.h"

namespace doris {
class TExpr;
Expand Down
55 changes: 27 additions & 28 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
#include "vec/core/block.h"
#include "vec/sink/delta_writer_v2_pool.h"
// NOLINTNEXTLINE(unused-includes)
#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h" // IWYU pragma: keep
#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"

Expand Down Expand Up @@ -96,7 +96,7 @@ Status VTabletWriterV2::_incremental_open_streams(
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
if (!_streams_for_node->contains(node)) {
if (!_load_stream_map->contains(node)) {
new_backends.insert(node);
}
_tablets_for_node[node].emplace(tablet_id, tablet);
Expand All @@ -112,7 +112,7 @@ Status VTabletWriterV2::_incremental_open_streams(
}
}
for (int64_t dst_id : new_backends) {
auto streams = _streams_for_node->get_or_create(dst_id);
auto streams = _load_stream_map->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
Expand Down Expand Up @@ -242,7 +242,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
} else {
_delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id);
}
_streams_for_node = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
_load_stream_map = ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
_load_id, _backend_id, _stream_per_node, _num_local_sink);
return Status::OK();
}
Expand All @@ -263,7 +263,7 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {

Status VTabletWriterV2::_open_streams() {
for (auto& [dst_id, _] : _tablets_for_node) {
auto streams = _streams_for_node->get_or_create(dst_id);
auto streams = _load_stream_map->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
Expand Down Expand Up @@ -361,7 +361,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id,
tablet.set_tablet_id(tablet_id);
VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, index_id, tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index));
streams.emplace_back(_load_stream_map->at(node_id)->at(_stream_index));
RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
Expand Down Expand Up @@ -470,13 +470,13 @@ Status VTabletWriterV2::_cancel(Status status) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
if (_streams_for_node) {
_streams_for_node->for_each([status](int64_t dst_id, const Streams& streams) {
if (_load_stream_map) {
_load_stream_map->for_each([status](int64_t dst_id, const Streams& streams) {
for (auto& stream : streams) {
stream->cancel(status);
}
});
_streams_for_node->release();
_load_stream_map->release();
}
return Status::OK();
}
Expand Down Expand Up @@ -542,29 +542,28 @@ Status VTabletWriterV2::close(Status exec_status) {
}

_calc_tablets_to_commit();
const bool is_last_sink = _streams_for_node->release();
const bool is_last_sink = _load_stream_map->release();
LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink
<< ", load_id=" << print_id(_load_id);

// send CLOSE_LOAD and close_wait on all streams
if (is_last_sink) {
RETURN_IF_ERROR(_streams_for_node->close_load());
RETURN_IF_ERROR(_load_stream_map->close_load());
SCOPED_TIMER(_close_load_timer);
RETURN_IF_ERROR(_streams_for_node->for_each_st(
[this](int64_t dst_id, const Streams& streams) -> Status {
for (auto& stream : streams) {
int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
_timeout_watch.elapsed_time() / 1000 / 1000;
if (remain_ms <= 0) {
LOG(WARNING) << "load timed out before close waiting, load_id="
<< print_id(_load_id);
return Status::TimedOut("load timed out before close waiting");
}
RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
}
return Status::OK();
}));
RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id,
const Streams& streams) -> Status {
for (auto& stream : streams) {
int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 -
_timeout_watch.elapsed_time() / 1000 / 1000;
if (remain_ms <= 0) {
LOG(WARNING) << "load timed out before close waiting, load_id="
<< print_id(_load_id);
return Status::TimedOut("load timed out before close waiting");
}
RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
}
return Status::OK();
}));
}

// calculate and submit commit info
Expand All @@ -573,7 +572,7 @@ Status VTabletWriterV2::close(Status exec_status) {
std::unordered_map<int64_t, Status> failed_reason;
std::vector<TTabletCommitInfo> tablet_commit_infos;

_streams_for_node->for_each([&](int64_t dst_id, const Streams& streams) {
_load_stream_map->for_each([&](int64_t dst_id, const Streams& streams) {
std::unordered_set<int64_t> known_tablets;
for (const auto& stream : streams) {
for (auto [tablet_id, reason] : stream->failed_tablets()) {
Expand Down Expand Up @@ -648,7 +647,7 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
}
LOG(WARNING) << msg;
}
_streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit);
_load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
}
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class VTabletWriterV2 final : public AsyncResultWriter {
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> _tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;

std::shared_ptr<LoadStreamMap> _streams_for_node;
std::shared_ptr<LoadStreamMap> _load_stream_map;

size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,21 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/load_stream_stub_pool.h"

#include <gtest/gtest.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'gtest/gtest.h' file not found [clang-diagnostic-error]

#include <gtest/gtest.h>
         ^


#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h"

namespace doris {

class LoadStreamStubPoolTest : public testing::Test {
class LoadStreamMapPoolTest : public testing::Test {
public:
LoadStreamStubPoolTest() = default;
virtual ~LoadStreamStubPoolTest() = default;
LoadStreamMapPoolTest() = default;
virtual ~LoadStreamMapPoolTest() = default;
};

TEST_F(LoadStreamStubPoolTest, test) {
LoadStreamStubPool pool;
TEST_F(LoadStreamMapPoolTest, test) {
LoadStreamMapPool pool;
int64_t src_id = 100;
PUniqueId load_id;
load_id.set_lo(1);
Expand Down