From b87bc4a7a371bdd26656b652465cced2076f76ca Mon Sep 17 00:00:00 2001 From: ZhangYu0123 Date: Tue, 25 Aug 2020 12:46:47 +0800 Subject: [PATCH 1/5] tmp commit --- be/src/olap/tablet.cpp | 94 +++++++++++++++++------------------ be/src/olap/tablet.h | 2 +- be/src/olap/tablet_meta.cpp | 34 ++++++++----- be/src/olap/tablet_meta.h | 6 +-- be/src/olap/version_graph.cpp | 16 +++++- be/src/olap/version_graph.h | 5 ++ 6 files changed, 92 insertions(+), 65 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index a382420f5ca079..4a654356523b9d 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -279,14 +279,14 @@ const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version) cons // This function only be called by SnapshotManager to perform incremental clone. // It will be called under protected of _meta_lock(SnapshotManager will fetch it manually), // so it is no need to lock here. -const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version) const { - auto iter = _inc_rs_version_map.find(version); - if (iter == _inc_rs_version_map.end()) { - VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name(); - return nullptr; - } - return iter->second; -} +// const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version) const { +// auto iter = _inc_rs_version_map.find(version); +// if (iter == _inc_rs_version_map.end()) { +// VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name(); +// return nullptr; +// } +// return iter->second; +// } // Already under _meta_lock const RowsetSharedPtr Tablet::rowset_with_max_version() const { @@ -336,18 +336,18 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) { return OLAP_SUCCESS; } -void Tablet::_delete_inc_rowset_by_version(const Version& version, - const VersionHash& version_hash) { - // delete incremental rowset from map - _inc_rs_version_map.erase(version); +// void Tablet::_delete_inc_rowset_by_version(const Version& version, +// const VersionHash& version_hash) { +// // delete incremental rowset from map +// _inc_rs_version_map.erase(version); - RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_inc_rs_meta_by_version(version); - if (rowset_meta == nullptr) { - return; - } - _tablet_meta->delete_inc_rs_meta_by_version(version); - VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version; -} +// RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_inc_rs_meta_by_version(version); +// if (rowset_meta == nullptr) { +// return; +// } +// _tablet_meta->delete_inc_rs_meta_by_version(version); +// VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version; +// } void Tablet::_delete_stale_rowset_by_version(const Version& version) { @@ -359,34 +359,34 @@ void Tablet::_delete_stale_rowset_by_version(const Version& version) { VLOG(3) << "delete stale rowset. tablet=" << full_name() << ", version=" << version; } -void Tablet::delete_expired_inc_rowsets() { - int64_t now = UnixSeconds(); - vector> expired_versions; - WriteLock wrlock(&_meta_lock); - for (auto& rs_meta : _tablet_meta->all_inc_rs_metas()) { - double diff = ::difftime(now, rs_meta->creation_time()); - if (diff >= config::inc_rowset_expired_sec) { - Version version(rs_meta->version()); - expired_versions.push_back(std::make_pair(version, rs_meta->version_hash())); - VLOG(3) << "find expire incremental rowset. tablet=" << full_name() - << ", version=" << version - << ", version_hash=" << rs_meta->version_hash() - << ", exist_sec=" << diff; - } - } - - if (expired_versions.empty()) { - return; - } - - for (auto& pair: expired_versions) { - _delete_inc_rowset_by_version(pair.first, pair.second); - VLOG(3) << "delete expire incremental data. tablet=" << full_name() - << ", version=" << pair.first; - } - - save_meta(); -} +// void Tablet::delete_expired_inc_rowsets() { +// int64_t now = UnixSeconds(); +// vector> expired_versions; +// WriteLock wrlock(&_meta_lock); +// for (auto& rs_meta : _tablet_meta->all_inc_rs_metas()) { +// double diff = ::difftime(now, rs_meta->creation_time()); +// if (diff >= config::inc_rowset_expired_sec) { +// Version version(rs_meta->version()); +// expired_versions.push_back(std::make_pair(version, rs_meta->version_hash())); +// VLOG(3) << "find expire incremental rowset. tablet=" << full_name() +// << ", version=" << version +// << ", version_hash=" << rs_meta->version_hash() +// << ", exist_sec=" << diff; +// } +// } + +// if (expired_versions.empty()) { +// return; +// } + +// for (auto& pair: expired_versions) { +// _delete_inc_rowset_by_version(pair.first, pair.second); +// VLOG(3) << "delete expire incremental data. tablet=" << full_name() +// << ", version=" << pair.first; +// } + +// save_meta(); +// } void Tablet::delete_expired_stale_rowset() { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index e8372fb5c51603..8f0885f8f525ef 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -240,7 +240,7 @@ class Tablet : public BaseTablet { void _max_continuous_version_from_begining_unlocked(Version* version, VersionHash* v_hash) const ; RowsetSharedPtr _rowset_with_largest_size(); - void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash); + // void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash); /// Delete stale rowset by version. This method not only delete the version in expired rowset map, /// but also delete the version in rowset meta vector. void _delete_stale_rowset_by_version(const Version& version); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 5fb25ce27dfcf1..6b0d0f46514356 100755 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -532,26 +532,34 @@ void TabletMeta::revise_rs_metas(std::vector&& rs_metas) { _rs_metas = std::move(rs_metas); } -void TabletMeta::revise_inc_rs_metas(std::vector&& rs_metas) { +void TabletMeta::revise_stale_rs_metas(std::vector&& stale_rs_metas) { WriteLock wrlock(&_meta_lock); // delete alter task _alter_task.reset(); - _inc_rs_metas = std::move(rs_metas); + _stale_rs_metas = std::move(stale_rs_metas); } -OLAPStatus TabletMeta::add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta) { - // check RowsetMeta is valid - for (auto rs : _inc_rs_metas) { - if (rs->version() == rs_meta->version()) { - LOG(WARNING) << "rowset already exist. rowset_id=" << rs->rowset_id(); - return OLAP_ERR_ROWSET_ALREADY_EXIST; - } - } +// void TabletMeta::revise_inc_rs_metas(std::vector&& rs_metas) { +// WriteLock wrlock(&_meta_lock); +// // delete alter task +// _alter_task.reset(); - _inc_rs_metas.push_back(rs_meta); - return OLAP_SUCCESS; -} +// _inc_rs_metas = std::move(rs_metas); +// } + +// OLAPStatus TabletMeta::add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta) { +// // check RowsetMeta is valid +// for (auto rs : _inc_rs_metas) { +// if (rs->version() == rs_meta->version()) { +// LOG(WARNING) << "rowset already exist. rowset_id=" << rs->rowset_id(); +// return OLAP_ERR_ROWSET_ALREADY_EXIST; +// } +// } + +// _inc_rs_metas.push_back(rs_meta); +// return OLAP_SUCCESS; +// } void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) { auto it = _stale_rs_metas.begin(); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 8ae8ccdd144b91..78d2c3a4874fbf 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -172,8 +172,8 @@ class TabletMeta { void revise_rs_metas(std::vector&& rs_metas); - void revise_inc_rs_metas(std::vector&& rs_metas); - inline const std::vector& all_inc_rs_metas() const; + void revise_stale_rs_metas(std::vector&& rs_metas); + // inline const std::vector& all_inc_rs_metas() const; inline const std::vector& all_stale_rs_metas() const; OLAPStatus add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta); void delete_inc_rs_meta_by_version(const Version& version); @@ -224,7 +224,7 @@ class TabletMeta { TabletSchema _schema; std::vector _rs_metas; - std::vector _inc_rs_metas; + // std::vector _inc_rs_metas; // This variable _stale_rs_metas is used to record these rowsets‘ meta which are be compacted. // These stale rowsets meta are been removed when rowsets' pathVersion is expired, // this policy is judged and computed by TimestampedVersionTracker. diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp index a64153c7e23ec4..8e49f112e648f7 100644 --- a/be/src/olap/version_graph.cpp +++ b/be/src/olap/version_graph.cpp @@ -29,7 +29,7 @@ namespace doris { void TimestampedVersionTracker::_construct_versioned_tracker(const std::vector& rs_metas) { int64_t max_version = 0; - // construct the roset graph + // construct the rowset graph _version_graph.reconstruct_version_graph(rs_metas, &max_version); } @@ -43,6 +43,20 @@ void TimestampedVersionTracker::construct_versioned_tracker(const std::vector& rs_metas, + std::vector>& stale_rs_metas) { + + if (rs_metas.empty()) { + VLOG(3) << "there is no version in the header."; + return; + } + construct_versioned_tracker(rs_metas); + + for (auto& path: stale_rs_metas) { + add_stale_path_version(path); + } +} void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) { diff --git a/be/src/olap/version_graph.h b/be/src/olap/version_graph.h index b8cac9ffd162e5..f49641347f81b4 100644 --- a/be/src/olap/version_graph.h +++ b/be/src/olap/version_graph.h @@ -139,6 +139,11 @@ class TimestampedVersionTracker { /// Construct rowsets version tracker by rs_metas and stale version path map. void construct_versioned_tracker(const std::vector& rs_metas); + /// Construct rowsets version tracker by rs_metas and stale version path map and stale rs meta + void construct_versioned_tracker( + const std::vector& rs_metas, + const std::vector& stale_metas)); + /// Recover rowsets version tracker from stale version path map. When delete operation fails, the /// tracker can be recovered from deleted stale_version_path_map. void recover_versioned_tracker(const std::map& stale_version_path_map); From fd34caf7b61b1b857ef8e37096d4eae46601251c Mon Sep 17 00:00:00 2001 From: ZhangYu0123 Date: Wed, 26 Aug 2020 10:32:43 +0800 Subject: [PATCH 2/5] stale rowsets persistence --- be/src/olap/tablet.cpp | 124 ++++++++------ be/src/olap/tablet.h | 3 +- be/src/olap/tablet_meta.cpp | 47 +++--- be/src/olap/tablet_meta.h | 7 +- be/src/olap/version_graph.cpp | 155 +++++++++++++++++- be/src/olap/version_graph.h | 21 ++- .../olap/timestamped_version_tracker_test.cpp | 17 ++ gensrc/proto/olap_file.proto | 1 + 8 files changed, 293 insertions(+), 82 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 4a654356523b9d..59fe0cedd3fb90 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -63,8 +63,9 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir, _last_base_compaction_success_millis(0), _cumulative_point(K_INVALID_CUMULATIVE_POINT), _cumulative_compaction_type(cumulative_compaction_type) { - // change _rs_graph to _timestamped_version_tracker - _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas()); + // construct _timestamped_versioned_tracker + _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(), + _tablet_meta->all_stale_rs_metas()); } OLAPStatus Tablet::_init_once_action() { @@ -108,6 +109,22 @@ OLAPStatus Tablet::_init_once_action() { _inc_rs_version_map[version] = std::move(rowset); } + // init stale rowset + for (auto& stale_rs_meta : _tablet_meta->all_stale_rs_metas()) { + Version version = stale_rs_meta->version(); + RowsetSharedPtr rowset = get_stale_rowset_by_version(version); + if (rowset == nullptr) { + res = RowsetFactory::create_rowset(&_schema, _tablet_path, stale_rs_meta, &rowset); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init stale rowset. tablet_id:" << tablet_id() + << ", schema_hash:" << schema_hash() << ", version=" << version + << ", res:" << res; + return res; + } + } + _stale_rs_version_map[version] = std::move(rowset); + } + return res; } @@ -276,17 +293,26 @@ const RowsetSharedPtr Tablet::get_rowset_by_version(const Version& version) cons return iter->second; } +const RowsetSharedPtr Tablet::get_stale_rowset_by_version(const Version& version) const { + auto iter = _stale_rs_version_map.find(version); + if (iter == _stale_rs_version_map.end()) { + VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name(); + return nullptr; + } + return iter->second; +} + // This function only be called by SnapshotManager to perform incremental clone. // It will be called under protected of _meta_lock(SnapshotManager will fetch it manually), // so it is no need to lock here. -// const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version) const { -// auto iter = _inc_rs_version_map.find(version); -// if (iter == _inc_rs_version_map.end()) { -// VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name(); -// return nullptr; -// } -// return iter->second; -// } +const RowsetSharedPtr Tablet::get_inc_rowset_by_version(const Version& version) const { + auto iter = _inc_rs_version_map.find(version); + if (iter == _inc_rs_version_map.end()) { + VLOG(3) << "no rowset for version:" << version << ", tablet: " << full_name(); + return nullptr; + } + return iter->second; +} // Already under _meta_lock const RowsetSharedPtr Tablet::rowset_with_max_version() const { @@ -336,18 +362,18 @@ OLAPStatus Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) { return OLAP_SUCCESS; } -// void Tablet::_delete_inc_rowset_by_version(const Version& version, -// const VersionHash& version_hash) { -// // delete incremental rowset from map -// _inc_rs_version_map.erase(version); +void Tablet::_delete_inc_rowset_by_version(const Version& version, + const VersionHash& version_hash) { + // delete incremental rowset from map + _inc_rs_version_map.erase(version); -// RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_inc_rs_meta_by_version(version); -// if (rowset_meta == nullptr) { -// return; -// } -// _tablet_meta->delete_inc_rs_meta_by_version(version); -// VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version; -// } + RowsetMetaSharedPtr rowset_meta = _tablet_meta->acquire_inc_rs_meta_by_version(version); + if (rowset_meta == nullptr) { + return; + } + _tablet_meta->delete_inc_rs_meta_by_version(version); + VLOG(3) << "delete incremental rowset. tablet=" << full_name() << ", version=" << version; +} void Tablet::_delete_stale_rowset_by_version(const Version& version) { @@ -359,34 +385,34 @@ void Tablet::_delete_stale_rowset_by_version(const Version& version) { VLOG(3) << "delete stale rowset. tablet=" << full_name() << ", version=" << version; } -// void Tablet::delete_expired_inc_rowsets() { -// int64_t now = UnixSeconds(); -// vector> expired_versions; -// WriteLock wrlock(&_meta_lock); -// for (auto& rs_meta : _tablet_meta->all_inc_rs_metas()) { -// double diff = ::difftime(now, rs_meta->creation_time()); -// if (diff >= config::inc_rowset_expired_sec) { -// Version version(rs_meta->version()); -// expired_versions.push_back(std::make_pair(version, rs_meta->version_hash())); -// VLOG(3) << "find expire incremental rowset. tablet=" << full_name() -// << ", version=" << version -// << ", version_hash=" << rs_meta->version_hash() -// << ", exist_sec=" << diff; -// } -// } - -// if (expired_versions.empty()) { -// return; -// } - -// for (auto& pair: expired_versions) { -// _delete_inc_rowset_by_version(pair.first, pair.second); -// VLOG(3) << "delete expire incremental data. tablet=" << full_name() -// << ", version=" << pair.first; -// } - -// save_meta(); -// } +void Tablet::delete_expired_inc_rowsets() { + int64_t now = UnixSeconds(); + vector> expired_versions; + WriteLock wrlock(&_meta_lock); + for (auto& rs_meta : _tablet_meta->all_inc_rs_metas()) { + double diff = ::difftime(now, rs_meta->creation_time()); + if (diff >= config::inc_rowset_expired_sec) { + Version version(rs_meta->version()); + expired_versions.push_back(std::make_pair(version, rs_meta->version_hash())); + VLOG(3) << "find expire incremental rowset. tablet=" << full_name() + << ", version=" << version + << ", version_hash=" << rs_meta->version_hash() + << ", exist_sec=" << diff; + } + } + + if (expired_versions.empty()) { + return; + } + + for (auto& pair: expired_versions) { + _delete_inc_rowset_by_version(pair.first, pair.second); + VLOG(3) << "delete expire incremental data. tablet=" << full_name() + << ", version=" << pair.first; + } + + save_meta(); +} void Tablet::delete_expired_stale_rowset() { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 8f0885f8f525ef..4f1ba22b928087 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -100,6 +100,7 @@ class Tablet : public BaseTablet { // The caller must call hold _meta_lock when call this two function. const RowsetSharedPtr get_rowset_by_version(const Version& version) const; const RowsetSharedPtr get_inc_rowset_by_version(const Version& version) const; + const RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const; const RowsetSharedPtr rowset_with_max_version() const; @@ -240,7 +241,7 @@ class Tablet : public BaseTablet { void _max_continuous_version_from_begining_unlocked(Version* version, VersionHash* v_hash) const ; RowsetSharedPtr _rowset_with_largest_size(); - // void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash); + void _delete_inc_rowset_by_version(const Version& version, const VersionHash& version_hash); /// Delete stale rowset by version. This method not only delete the version in expired rowset map, /// but also delete the version in rowset meta vector. void _delete_stale_rowset_by_version(const Version& version); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 6b0d0f46514356..05aed150e26903 100755 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -381,6 +381,12 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { _inc_rs_metas.push_back(std::move(rs_meta)); } + for (auto& it : tablet_meta_pb.stale_rs_metas()) { + RowsetMetaSharedPtr rs_meta(new AlphaRowsetMeta()); + rs_meta->init_from_pb(it); + _stale_rs_metas.push_back(std::move(rs_meta)); + } + // generate AlterTabletTask if (tablet_meta_pb.has_alter_task()) { AlterTabletTask* alter_tablet_task = new AlterTabletTask(); @@ -431,6 +437,9 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { for (auto rs : _inc_rs_metas) { rs->to_rowset_pb(tablet_meta_pb->add_inc_rs_metas()); } + for (auto rs : _stale_rs_metas) { + rs->to_rowset_pb(tablet_meta_pb->add_stale_rs_metas()); + } _schema.to_schema_pb(tablet_meta_pb->mutable_schema()); if (_alter_task != nullptr) { _alter_task->to_alter_pb(tablet_meta_pb->mutable_alter_task()); @@ -532,34 +541,34 @@ void TabletMeta::revise_rs_metas(std::vector&& rs_metas) { _rs_metas = std::move(rs_metas); } -void TabletMeta::revise_stale_rs_metas(std::vector&& stale_rs_metas) { +void TabletMeta::revise_inc_rs_metas(std::vector&& rs_metas) { WriteLock wrlock(&_meta_lock); // delete alter task _alter_task.reset(); - _stale_rs_metas = std::move(stale_rs_metas); + _inc_rs_metas = std::move(rs_metas); } -// void TabletMeta::revise_inc_rs_metas(std::vector&& rs_metas) { -// WriteLock wrlock(&_meta_lock); -// // delete alter task -// _alter_task.reset(); +void TabletMeta::revise_stale_rs_metas(std::vector&& rs_metas) { + WriteLock wrlock(&_meta_lock); + // delete alter task + _alter_task.reset(); -// _inc_rs_metas = std::move(rs_metas); -// } + _stale_rs_metas = std::move(rs_metas); +} -// OLAPStatus TabletMeta::add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta) { -// // check RowsetMeta is valid -// for (auto rs : _inc_rs_metas) { -// if (rs->version() == rs_meta->version()) { -// LOG(WARNING) << "rowset already exist. rowset_id=" << rs->rowset_id(); -// return OLAP_ERR_ROWSET_ALREADY_EXIST; -// } -// } +OLAPStatus TabletMeta::add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta) { + // check RowsetMeta is valid + for (auto rs : _inc_rs_metas) { + if (rs->version() == rs_meta->version()) { + LOG(WARNING) << "rowset already exist. rowset_id=" << rs->rowset_id(); + return OLAP_ERR_ROWSET_ALREADY_EXIST; + } + } -// _inc_rs_metas.push_back(rs_meta); -// return OLAP_SUCCESS; -// } + _inc_rs_metas.push_back(rs_meta); + return OLAP_SUCCESS; +} void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) { auto it = _stale_rs_metas.begin(); diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 78d2c3a4874fbf..214958e815b073 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -171,9 +171,10 @@ class TabletMeta { const std::vector& to_delete); void revise_rs_metas(std::vector&& rs_metas); - + void revise_inc_rs_metas(std::vector&& rs_metas); void revise_stale_rs_metas(std::vector&& rs_metas); - // inline const std::vector& all_inc_rs_metas() const; + + inline const std::vector& all_inc_rs_metas() const; inline const std::vector& all_stale_rs_metas() const; OLAPStatus add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta); void delete_inc_rs_meta_by_version(const Version& version); @@ -224,7 +225,7 @@ class TabletMeta { TabletSchema _schema; std::vector _rs_metas; - // std::vector _inc_rs_metas; + std::vector _inc_rs_metas; // This variable _stale_rs_metas is used to record these rowsets‘ meta which are be compacted. // These stale rowsets meta are been removed when rowsets' pathVersion is expired, // this policy is judged and computed by TimestampedVersionTracker. diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp index 8e49f112e648f7..f397dce7f9c335 100644 --- a/be/src/olap/version_graph.cpp +++ b/be/src/olap/version_graph.cpp @@ -45,17 +45,164 @@ void TimestampedVersionTracker::construct_versioned_tracker(const std::vector& rs_metas, - std::vector>& stale_rs_metas) { + const std::vector& stale_metas) { if (rs_metas.empty()) { VLOG(3) << "there is no version in the header."; return; } - construct_versioned_tracker(rs_metas); + _stale_version_path_map.clear(); + _next_path_id = 1; + _construct_versioned_tracker(rs_metas); - for (auto& path: stale_rs_metas) { - add_stale_path_version(path); + // init _stale_version_path_map + _init_stale_version_path_map(rs_metas, stale_metas); +} + +void TimestampedVersionTracker::_init_stale_version_path_map( + const std::vector& rs_metas, + const std::vector& stale_metas) { + + if (stale_metas.empty()) { + return; } + + // sort stale meta by version diff (second version - first version) + std::list sorted_stale_metas; + for (auto& rs : stale_metas) { + sorted_stale_metas.emplace_back(rs); + } + + // 1. Sort the existing rowsets by version in ascending order + sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) { + // compare by version diff between version.first and version.second + int64_t a_diff = a->version().second - a->version().first; + int64_t b_diff = b->version().second - b->version().first; + + int diff = a_diff - b_diff; + if (diff < 0) { + return true; + } + else if (diff > 0) { + return false; + } + // when the version diff is equal, compare rowset createtime + return a->creation_time() < b->creation_time(); + }); + + // first_version -> (second_version -> rowset_meta) + std::unordered_map> stale_map; + + // 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map. + // when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path + // and add the path to _stale_version_path_map. + for(auto& stale_meta:sorted_stale_metas) { + std::vector stale_path; + // 2.1 find a path in stale_map can replace current stale_meta version + bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(), stale_meta->end_version(), &stale_path); + + // 2.2 add stale_meta to stale_map + auto start_iter = stale_map.find(stale_meta->start_version()); + if (start_iter != stale_map.end()) { + start_iter->second[stale_meta->end_version()] = stale_meta; + } else { + std::unordered_map item; + item[stale_meta->end_version()] = stale_meta; + stale_map[stale_meta->start_version()] = std::move(item); + } + // 2.3 add version to version_graph + Version stale_meta_version = stale_meta->version(); + add_version(stale_meta_version); + // 2.4 find the path + if (r) { + // add the path to _stale_version_path_map + add_stale_path_version(stale_path); + // remove stale_path from stale_map + for (auto stale_item:stale_path) { + stale_map[stale_item->start_version()].erase(stale_item->end_version()); + + if (stale_map[stale_item->start_version()].empty()) { + stale_map.erase(stale_item->start_version()); + } + } + } + } + + // 3. generate stale path from rs_metas + for(auto& stale_meta:rs_metas) { + std::vector stale_path; + // 3.1 find a path in stale_map can replace current stale_meta version + bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(), stale_meta->end_version(), &stale_path); + + // 3.2 find the path + if (r) { + // add the path to _stale_version_path_map + add_stale_path_version(stale_path); + // remove stale_path from stale_map + for (auto stale_item:stale_path) { + stale_map[stale_item->start_version()].erase(stale_item->end_version()); + + if (stale_map[stale_item->start_version()].empty()) { + stale_map.erase(stale_item->start_version()); + } + } + } + } + + // 4. process remain stale rowset_meta in stale_map + auto map_iter = stale_map.begin(); + while (map_iter != stale_map.end()) { + auto second_iter = map_iter->second.begin(); + while(second_iter != map_iter->second.end()) { + // each remain stale rowset_meta generate a stale path + std::vector stale_path; + stale_path.push_back(second_iter->second); + add_stale_path_version(stale_path); + + second_iter++; + } + map_iter++; + } +} + +bool TimestampedVersionTracker::_find_path_from_stale_map( + const std::unordered_map>& stale_map, + int64_t first_version, int64_t second_version, std::vector* stale_path) { + + auto first_iter = stale_map.find(first_version); + // if first_version not in stale_map, there is no path. + if (first_iter == stale_map.end()) { + return false; + } + auto& second_version_map = first_iter->second; + auto second_iter = second_version_map.find(second_version); + // if second_version in stale_map, find a path. + if (second_iter != second_version_map.end()) { + auto row_meta = second_iter->second; + // add rowset to path + stale_path->push_back(row_meta); + return true; + } + + // traverse the first version map to backtracking _find_path_from_stale_map + auto map_iter = second_version_map.begin(); + while (map_iter != second_version_map.end()) { + // the version greater than second_version, we can't find path in stale_map + if (map_iter->first > second_version) { + continue; + } + // backtracking _find_path_from_stale_map find from map_iter->first + 1 to second_version + stale_path->push_back(map_iter->second); + bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version, stale_path); + if (r) { + return true; + } + // there is no path in current version, pop and continue + stale_path->pop_back(); + map_iter++; + } + + return false; } void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) { diff --git a/be/src/olap/version_graph.h b/be/src/olap/version_graph.h index f49641347f81b4..e051fbc4c5fcf3 100644 --- a/be/src/olap/version_graph.h +++ b/be/src/olap/version_graph.h @@ -136,13 +136,12 @@ using PathVersionListSharedPtr = std::shared_ptr& rs_metas); - /// Construct rowsets version tracker by rs_metas and stale version path map and stale rs meta - void construct_versioned_tracker( - const std::vector& rs_metas, - const std::vector& stale_metas)); + /// Construct rowsets version tracker by main path rowset meta and stale rowset meta. + void construct_versioned_tracker(const std::vector& rs_metas, + const std::vector& stale_metas); /// Recover rowsets version tracker from stale version path map. When delete operation fails, the /// tracker can be recovered from deleted stale_version_path_map. @@ -185,9 +184,19 @@ class TimestampedVersionTracker { void get_stale_version_path_json_doc(rapidjson::Document& path_arr); private: - /// Construct rowsets version tracker with stale rowsets. + /// Construct rowsets version tracker with main path rowset meta. void _construct_versioned_tracker(const std::vector& rs_metas); + /// init stale_version_path_map by main path rowset meta and stale rowset meta. + void _init_stale_version_path_map(const std::vector& rs_metas, + const std::vector& stale_metas); + + /// find a path in stale_map from first_version to second_version, stale_path is used as result. + bool _find_path_from_stale_map( + const std::unordered_map>& stale_map, + int64_t first_version, int64_t second_version, + std::vector* stale_path); + private: // This variable records the id of path version which will be dispatched to next path version, // it is not persisted. diff --git a/be/test/olap/timestamped_version_tracker_test.cpp b/be/test/olap/timestamped_version_tracker_test.cpp index 9b4ce966483b3b..3fb3f9ed39c4f4 100644 --- a/be/test/olap/timestamped_version_tracker_test.cpp +++ b/be/test/olap/timestamped_version_tracker_test.cpp @@ -480,6 +480,23 @@ TEST_F(TestTimestampedVersionTracker, construct_versioned_tracker) { ASSERT_EQ(1, tracker._next_path_id); } +TEST_F(TestTimestampedVersionTracker, construct_version_tracker_by_stale_meta) { + + std::vector rs_metas; + std::vector expried_rs_metas; + std::vector version_path; + + init_all_rs_meta(&rs_metas); + init_expried_row_rs_meta(&expried_rs_metas); + + TimestampedVersionTracker tracker; + tracker.construct_versioned_tracker(rs_metas, expried_rs_metas); + + ASSERT_EQ(10, tracker._version_graph._version_graph.size()); + ASSERT_EQ(4, tracker._stale_version_path_map.size()); + ASSERT_EQ(5, tracker._next_path_id); +} + TEST_F(TestTimestampedVersionTracker, construct_versioned_tracker_with_same_rowset) { std::vector rs_metas; diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index d4f9cf758523de..72805b8a04671a 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -308,6 +308,7 @@ message TabletMetaPB { optional int64 end_rowset_id = 15; optional RowsetTypePB preferred_rowset_type = 16; optional TabletTypePB tablet_type = 17; + repeated RowsetMetaPB stale_rs_metas = 18; } message OLAPIndexHeaderMessage { From ae440416d05761c8c832430061936a8aa0b7b22f Mon Sep 17 00:00:00 2001 From: ZhangYu0123 Date: Wed, 26 Aug 2020 14:09:05 +0800 Subject: [PATCH 3/5] fix --- be/src/olap/tablet.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 59fe0cedd3fb90..ef47a51fbb857b 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -63,7 +63,7 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir, _last_base_compaction_success_millis(0), _cumulative_point(K_INVALID_CUMULATIVE_POINT), _cumulative_compaction_type(cumulative_compaction_type) { - // construct _timestamped_versioned_tracker + // construct _timestamped_versioned_tracker from rs and stale rs meta _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas()); } From 5bf6a83f123e5b4a7546fd848edb6eff0b274696 Mon Sep 17 00:00:00 2001 From: ZhangYu0123 Date: Wed, 26 Aug 2020 23:28:06 +0800 Subject: [PATCH 4/5] fix --- be/src/olap/tablet.cpp | 16 +++++++--------- be/src/olap/tablet_meta.cpp | 8 -------- be/src/olap/tablet_meta.h | 1 - 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index ef47a51fbb857b..80c95161da60bd 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -112,15 +112,13 @@ OLAPStatus Tablet::_init_once_action() { // init stale rowset for (auto& stale_rs_meta : _tablet_meta->all_stale_rs_metas()) { Version version = stale_rs_meta->version(); - RowsetSharedPtr rowset = get_stale_rowset_by_version(version); - if (rowset == nullptr) { - res = RowsetFactory::create_rowset(&_schema, _tablet_path, stale_rs_meta, &rowset); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "fail to init stale rowset. tablet_id:" << tablet_id() - << ", schema_hash:" << schema_hash() << ", version=" << version - << ", res:" << res; - return res; - } + RowsetSharedPtr rowset; + res = RowsetFactory::create_rowset(&_schema, _tablet_path, stale_rs_meta, &rowset); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "fail to init stale rowset. tablet_id:" << tablet_id() + << ", schema_hash:" << schema_hash() << ", version=" << version + << ", res:" << res; + return res; } _stale_rs_version_map[version] = std::move(rowset); } diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 05aed150e26903..15801facbdf808 100755 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -549,14 +549,6 @@ void TabletMeta::revise_inc_rs_metas(std::vector&& rs_metas _inc_rs_metas = std::move(rs_metas); } -void TabletMeta::revise_stale_rs_metas(std::vector&& rs_metas) { - WriteLock wrlock(&_meta_lock); - // delete alter task - _alter_task.reset(); - - _stale_rs_metas = std::move(rs_metas); -} - OLAPStatus TabletMeta::add_inc_rs_meta(const RowsetMetaSharedPtr& rs_meta) { // check RowsetMeta is valid for (auto rs : _inc_rs_metas) { diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 214958e815b073..2058d72dff107e 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -172,7 +172,6 @@ class TabletMeta { void revise_rs_metas(std::vector&& rs_metas); void revise_inc_rs_metas(std::vector&& rs_metas); - void revise_stale_rs_metas(std::vector&& rs_metas); inline const std::vector& all_inc_rs_metas() const; inline const std::vector& all_stale_rs_metas() const; From 08c07ce53af19c72d9d4dfd998fca76746029fb9 Mon Sep 17 00:00:00 2001 From: ZhangYu0123 Date: Fri, 28 Aug 2020 16:48:19 +0800 Subject: [PATCH 5/5] fix --- be/src/olap/tablet.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 80c95161da60bd..be092ca51490f3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -919,6 +919,12 @@ bool Tablet::check_path(const std::string& path_to_check) const { return true; } } + for (auto& stale_version_rowset : _stale_rs_version_map) { + bool ret = stale_version_rowset.second->check_path(path_to_check); + if (ret) { + return true; + } + } return false; } @@ -943,6 +949,11 @@ bool Tablet::check_rowset_id(const RowsetId& rowset_id) { return true; } } + for (auto& stale_version_rowset : _stale_rs_version_map) { + if (stale_version_rowset.second->rowset_id() == rowset_id) { + return true; + } + } if (RowsetMetaManager::check_rowset_meta(_data_dir->get_meta(), tablet_uid(), rowset_id)) { return true; } @@ -1144,6 +1155,14 @@ bool Tablet::rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta) { find_version = true; } } + for (auto& stale_version_rowset : _stale_rs_version_map) { + if (stale_version_rowset.second->rowset_id() == rowset_meta->rowset_id()) { + find_rowset_id = true; + } + if (stale_version_rowset.second->contains_version(rowset_meta->version())) { + find_version = true; + } + } return find_rowset_id || !find_version; }