diff --git a/be/src/http/action/download_binlog_action.cpp b/be/src/http/action/download_binlog_action.cpp index 7548328a8355bb..a23d5ec109f907 100644 --- a/be/src/http/action/download_binlog_action.cpp +++ b/be/src/http/action/download_binlog_action.cpp @@ -134,7 +134,7 @@ void handle_get_rowset_meta(HttpRequest* req) { auto tablet = get_tablet(tablet_id); const auto& rowset_id = get_http_param(req, kRowsetIdParameter); const auto& binlog_version = get_http_param(req, kBinlogVersionParameter); - auto rowset_meta = tablet->get_binlog_rowset_meta(binlog_version, rowset_id); + auto rowset_meta = tablet->get_rowset_binlog_meta(binlog_version, rowset_id); if (rowset_meta.empty()) { HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, fmt::format("get rowset meta failed, rowset_id={}", rowset_id)); diff --git a/be/src/olap/binlog.h b/be/src/olap/binlog.h index 57853bd6db0888..b079383cc79fb6 100644 --- a/be/src/olap/binlog.h +++ b/be/src/olap/binlog.h @@ -29,13 +29,13 @@ constexpr std::string_view kBinlogPrefix = "binlog_"; constexpr std::string_view kBinlogMetaPrefix = "binlog_meta_"; constexpr std::string_view kBinlogDataPrefix = "binlog_data_"; -inline auto make_binlog_meta_key(std::string_view tablet, int64_t version, - std::string_view rowset) { +inline auto make_binlog_meta_key(const std::string_view tablet, int64_t version, + const std::string_view rowset) { return fmt::format("{}meta_{}_{:020d}_{}", kBinlogPrefix, tablet, version, rowset); } -inline auto make_binlog_meta_key(std::string_view tablet, std::string_view version_str, - std::string_view rowset) { +inline auto make_binlog_meta_key(const std::string_view tablet, const std::string_view version_str, + const std::string_view rowset) { // TODO(Drogon): use fmt::format not convert to version_num, only string with length prefix '0' int64_t version = std::atoll(version_str.data()); return make_binlog_meta_key(tablet, version, rowset); @@ -46,13 +46,21 @@ inline auto make_binlog_meta_key(const TabletUid& tablet_uid, int64_t version, return make_binlog_meta_key(tablet_uid.to_string(), version, rowset_id.to_string()); } -inline auto make_binlog_data_key(std::string_view tablet, int64_t version, - std::string_view rowset) { +inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid) { + return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_uid.to_string()); +} + +inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid, int64_t version) { + return fmt::format("{}meta_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version); +} + +inline auto make_binlog_data_key(const std::string_view tablet, int64_t version, + const std::string_view rowset) { return fmt::format("{}data_{}_{:020d}_{}", kBinlogPrefix, tablet, version, rowset); } -inline auto make_binlog_data_key(std::string_view tablet, std::string_view version, - std::string_view rowset) { +inline auto make_binlog_data_key(const std::string_view tablet, const std::string_view version, + const std::string_view rowset) { return fmt::format("{}data_{}_{:0>20}_{}", kBinlogPrefix, tablet, version, rowset); } @@ -61,19 +69,20 @@ inline auto make_binlog_data_key(const TabletUid& tablet_uid, int64_t version, return make_binlog_data_key(tablet_uid.to_string(), version, rowset_id.to_string()); } -inline auto make_binlog_filename_key(const TabletUid& tablet_uid, std::string_view version) { - return fmt::format("{}meta_{}_{:0>20}_", kBinlogPrefix, tablet_uid.to_string(), version); +inline auto make_binlog_data_key(const TabletUid& tablet_uid, int64_t version, + const std::string_view rowset_id) { + return make_binlog_data_key(tablet_uid.to_string(), version, rowset_id); } -inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid) { - return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_uid.to_string()); +inline auto make_binlog_data_key_prefix(const TabletUid& tablet_uid, int64_t version) { + return fmt::format("{}data_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version); } -inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid, int64_t version) { - return fmt::format("{}meta_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version); +inline auto make_binlog_filename_key(const TabletUid& tablet_uid, const std::string_view version) { + return fmt::format("{}meta_{}_{:0>20}_", kBinlogPrefix, tablet_uid.to_string(), version); } -inline bool starts_with_binlog_meta(std::string_view str) { +inline bool starts_with_binlog_meta(const std::string_view str) { auto prefix = kBinlogMetaPrefix; if (prefix.length() > str.length()) { return false; @@ -82,7 +91,7 @@ inline bool starts_with_binlog_meta(std::string_view str) { return str.compare(0, prefix.length(), prefix) == 0; } -inline std::string get_binlog_data_key_from_meta_key(std::string_view meta_key) { +inline std::string get_binlog_data_key_from_meta_key(const std::string_view meta_key) { // like "binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7" => "binlog_data-6943f1585fe834b5-e542c2b83a21d0b7" return fmt::format("{}data_{}", kBinlogPrefix, meta_key.substr(kBinlogMetaPrefix.length())); } diff --git a/be/src/olap/pb_helper.h b/be/src/olap/pb_helper.h new file mode 100644 index 00000000000000..0819eb16c52907 --- /dev/null +++ b/be/src/olap/pb_helper.h @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" + +namespace doris { +template +Status read_pb(const std::string& pb_filename, PB* pb) { + std::fstream pb_file_stream(pb_filename, std::ios::in | std::ios::binary); + if (pb_file_stream.bad()) { + auto err_msg = fmt::format("fail to open rowset binlog metas file. [path={}]", pb_filename); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + + if (pb->ParseFromIstream(&pb_file_stream)) { + return Status::OK(); + } + return Status::InternalError("fail to parse rowset binlog metas file"); +} + +template +Status write_pb(const std::string& pb_filename, const PB& pb) { + std::fstream pb_file_stream(pb_filename, std::ios::out | std::ios::trunc | std::ios::binary); + if (pb_file_stream.bad()) { + auto err_msg = fmt::format("fail to open rowset binlog metas file. [path={}]", pb_filename); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + + if (!pb.SerializeToOstream(&pb_file_stream)) { + auto err_msg = + fmt::format("fail to save rowset binlog metas to file. [path={}]", pb_filename); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + + if (!pb_file_stream.flush()) { + auto err_msg = + fmt::format("fail to flush rowset binlog metas to file. [path={}]", pb_filename); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + + pb_file_stream.close(); + if (pb_file_stream.bad()) { + auto err_msg = + fmt::format("fail to close rowset binlog metas file. [path={}]", pb_filename); + LOG(WARNING) << err_msg; + return Status::InternalError(err_msg); + } + + return Status::OK(); +} +} // namespace doris diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp index 7047b965dc80cc..f879aaf89957f9 100644 --- a/be/src/olap/rowset/rowset_meta_manager.cpp +++ b/be/src/olap/rowset/rowset_meta_manager.cpp @@ -257,7 +257,7 @@ std::pair RowsetMetaManager::get_binlog_info( return std::make_pair(rowset_id, num_segments); } -std::string RowsetMetaManager::get_binlog_rowset_meta(OlapMeta* meta, TabletUid tablet_uid, +std::string RowsetMetaManager::get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version, std::string_view rowset_id) { auto binlog_data_key = make_binlog_data_key(tablet_uid.to_string(), binlog_version, rowset_id); @@ -275,6 +275,135 @@ std::string RowsetMetaManager::get_binlog_rowset_meta(OlapMeta* meta, TabletUid return binlog_meta_value; } +Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + const std::vector& binlog_versions, + RowsetBinlogMetasPB* metas_pb) { + if (binlog_versions.empty()) { + return _get_all_rowset_binlog_metas(meta, tablet_uid, metas_pb); + } else { + return _get_rowset_binlog_metas(meta, tablet_uid, binlog_versions, metas_pb); + } +} + +Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + const std::vector& binlog_versions, + RowsetBinlogMetasPB* metas_pb) { + Status status; + auto tablet_uid_str = tablet_uid.to_string(); + auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str]( + const std::string& key, const std::string& value) -> bool { + VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); + if (!starts_with_binlog_meta(key)) { + auto err_msg = fmt::format("invalid binlog meta key:{}", key); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + + BinlogMetaEntryPB binlog_meta_entry_pb; + if (!binlog_meta_entry_pb.ParseFromString(value)) { + auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); + + auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); + binlog_meta_pb->set_rowset_id(rowset_id); + binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); + binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); + binlog_meta_pb->set_meta_key(key); + binlog_meta_pb->set_meta(value); + + auto binlog_data_key = + make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); + std::string binlog_data; + status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); + if (!status.OK()) { + LOG(WARNING) << status.to_string(); + return false; + } + binlog_meta_pb->set_data_key(binlog_data_key); + binlog_meta_pb->set_data(binlog_data); + + return false; + }; + + for (auto& binlog_version : binlog_versions) { + auto prefix_key = make_binlog_meta_key_prefix(tablet_uid, binlog_version); + Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); + if (!iterStatus.ok()) { + LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}", + prefix_key, iterStatus.to_string()); + return iterStatus; + } + if (!status.ok()) { + return status; + } + } + return status; +} + +Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + RowsetBinlogMetasPB* metas_pb) { + Status status; + auto tablet_uid_str = tablet_uid.to_string(); + int64_t tablet_id = 0; + auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &tablet_id]( + const std::string& key, const std::string& value) -> bool { + VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value); + if (!starts_with_binlog_meta(key)) { + LOG(INFO) << fmt::format("end scan binlog meta. key:{}", key); + return false; + } + + BinlogMetaEntryPB binlog_meta_entry_pb; + if (!binlog_meta_entry_pb.ParseFromString(value)) { + auto err_msg = fmt::format("fail to parse binlog meta value:{}", value); + status = Status::InternalError(err_msg); + LOG(WARNING) << err_msg; + return false; + } + if (tablet_id == 0) { + tablet_id = binlog_meta_entry_pb.tablet_id(); + } else if (tablet_id != binlog_meta_entry_pb.tablet_id()) { + // scan all binlog meta, so tablet_id should be same: + return false; + } + auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2(); + + auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas(); + binlog_meta_pb->set_rowset_id(rowset_id); + binlog_meta_pb->set_version(binlog_meta_entry_pb.version()); + binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments()); + binlog_meta_pb->set_meta_key(key); + binlog_meta_pb->set_meta(value); + + auto binlog_data_key = + make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id); + std::string binlog_data; + status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data); + if (!status.OK()) { + LOG(WARNING) << status.to_string(); + return false; + } + binlog_meta_pb->set_data_key(binlog_data_key); + binlog_meta_pb->set_data(binlog_data); + + return true; + }; + + auto prefix_key = make_binlog_meta_key_prefix(tablet_uid); + Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func); + if (!iterStatus.ok()) { + LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}", + prefix_key, iterStatus.to_string()); + return iterStatus; + } + return status; +} + Status RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) { std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string(); VLOG_NOTICE << "start to remove rowset, key:" << key; @@ -288,6 +417,27 @@ Status RowsetMetaManager::remove_binlog(OlapMeta* meta, const std::string& suffi {kBinlogMetaPrefix.data() + suffix, kBinlogDataPrefix.data() + suffix}); } +Status RowsetMetaManager::ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, + RowsetBinlogMetasPB* metas_pb) { + std::vector entries; + const auto tablet_uid_str = tablet_uid.to_string(); + + for (auto& rowset_binlog_meta : *metas_pb->mutable_rowset_binlog_metas()) { + auto& rowset_id = rowset_binlog_meta.rowset_id(); + auto version = rowset_binlog_meta.version(); + + auto meta_key = rowset_binlog_meta.mutable_meta_key(); + *meta_key = make_binlog_meta_key(tablet_uid_str, version, rowset_id); + auto data_key = rowset_binlog_meta.mutable_data_key(); + *data_key = make_binlog_data_key(tablet_uid_str, version, rowset_id); + + entries.emplace_back(*meta_key, rowset_binlog_meta.meta()); + entries.emplace_back(*data_key, rowset_binlog_meta.data()); + } + + return meta->put(META_COLUMN_FAMILY_INDEX, entries); +} + Status RowsetMetaManager::traverse_rowset_metas( OlapMeta* meta, std::function const& func) { diff --git a/be/src/olap/rowset/rowset_meta_manager.h b/be/src/olap/rowset/rowset_meta_manager.h index f0e834790b108f..0c04cb686c5f31 100644 --- a/be/src/olap/rowset/rowset_meta_manager.h +++ b/be/src/olap/rowset/rowset_meta_manager.h @@ -53,31 +53,39 @@ class RowsetMetaManager { const RowsetMetaPB& rowset_meta_pb, bool enable_binlog); static Status save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id, const RowsetMetaPB& rowset_meta_pb); + static std::vector get_binlog_filenames(OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version, int64_t segment_idx); static std::pair get_binlog_info(OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version); - static std::string get_binlog_rowset_meta(OlapMeta* meta, TabletUid tablet_uid, + static std::string get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid, std::string_view binlog_version, std::string_view rowset_id); - - static Status remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id); - + static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + const std::vector& binlog_versions, + RowsetBinlogMetasPB* metas_pb); static Status remove_binlog(OlapMeta* meta, const std::string& suffix); - + static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid, + RowsetBinlogMetasPB* metas_pb); static Status traverse_rowset_metas(OlapMeta* meta, std::function const& collector); - static Status traverse_binlog_metas( OlapMeta* meta, std::function const& func); + static Status remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id); + static Status load_json_rowset_meta(OlapMeta* meta, const std::string& rowset_meta_path); private: static Status _save_with_binlog(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id, const RowsetMetaPB& rowset_meta_pb); + static Status _get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + const std::vector& binlog_versions, + RowsetBinlogMetasPB* metas_pb); + static Status _get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid, + RowsetBinlogMetasPB* metas_pb); }; } // namespace doris diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index 73766b7090a0f4..e5b1aa8bcaf89b 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -42,6 +42,7 @@ #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/pb_helper.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_factory.h" #include "olap/rowset/rowset_meta.h" @@ -383,6 +384,8 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet return res; } + bool is_copy_binlog = request.__isset.is_copy_binlog ? request.is_copy_binlog : false; + // schema_full_path_desc.filepath: // /snapshot_id_path/tablet_id/schema_hash/ auto schema_full_path = get_schema_hash_full_path(ref_tablet, snapshot_id_path); @@ -597,6 +600,64 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet } while (false); + // link all binlog files to snapshot path + do { + if (!res.ok()) { + break; + } + + if (!is_copy_binlog) { + break; + } + + RowsetBinlogMetasPB rowset_binlog_metas_pb; + if (request.__isset.missing_version) { + res = ref_tablet->get_rowset_binlog_metas(request.missing_version, + &rowset_binlog_metas_pb); + } else { + std::vector missing_versions; + res = ref_tablet->get_rowset_binlog_metas(missing_versions, &rowset_binlog_metas_pb); + } + if (!res.ok()) { + break; + } + if (rowset_binlog_metas_pb.rowset_binlog_metas_size() == 0) { + break; + } + + // write to pb file + auto rowset_binlog_metas_pb_filename = + fmt::format("{}/rowset_binlog_metas.pb", schema_full_path); + res = write_pb(rowset_binlog_metas_pb_filename, rowset_binlog_metas_pb); + if (!res.ok()) { + break; + } + + for (auto& rowset_binlog_meta : rowset_binlog_metas_pb.rowset_binlog_metas()) { + std::string segment_file_path; + auto num_segments = rowset_binlog_meta.num_segments(); + std::string_view rowset_id = rowset_binlog_meta.rowset_id(); + + for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) { + segment_file_path = ref_tablet->get_segment_filepath(rowset_id, segment_index); + auto snapshot_segment_file_path = + fmt::format("{}/{}_{}.binlog", schema_full_path, rowset_id, segment_index); + + res = io::global_local_filesystem()->link_file(segment_file_path, + snapshot_segment_file_path); + if (!res.ok()) { + LOG(WARNING) << "fail to link binlog file. [src=" << segment_file_path + << ", dest=" << snapshot_segment_file_path << "]"; + break; + } + } + + if (!res.ok()) { + break; + } + } + } while (false); + if (!res.ok()) { LOG(WARNING) << "fail to make snapshot, try to delete the snapshot path. path=" << snapshot_id_path.c_str(); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 7f03f726daaa03..9bbabb48c699c5 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3492,17 +3492,27 @@ std::pair Tablet::get_binlog_info(std::string_view binlog_ return RowsetMetaManager::get_binlog_info(_data_dir->get_meta(), tablet_uid(), binlog_version); } -std::string Tablet::get_binlog_rowset_meta(std::string_view binlog_version, +std::string Tablet::get_rowset_binlog_meta(std::string_view binlog_version, std::string_view rowset_id) const { - return RowsetMetaManager::get_binlog_rowset_meta(_data_dir->get_meta(), tablet_uid(), + return RowsetMetaManager::get_rowset_binlog_meta(_data_dir->get_meta(), tablet_uid(), binlog_version, rowset_id); } +Status Tablet::get_rowset_binlog_metas(const std::vector& binlog_versions, + RowsetBinlogMetasPB* metas_pb) { + return RowsetMetaManager::get_rowset_binlog_metas(_data_dir->get_meta(), tablet_uid(), + binlog_versions, metas_pb); +} + std::string Tablet::get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const { return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index); } +std::string Tablet::get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const { + return fmt::format("{}/_binlog/{}_{}.dat", _tablet_path, rowset_id, segment_index); +} + std::vector Tablet::get_binlog_filepath(std::string_view binlog_version) const { const auto& [rowset_id, num_segments] = get_binlog_info(binlog_version); std::vector binlog_filepath; @@ -3608,6 +3618,10 @@ void Tablet::gc_binlogs(int64_t version) { } } +Status Tablet::ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb) { + return RowsetMetaManager::ingest_binlog_metas(_data_dir->get_meta(), tablet_uid(), metas_pb); +} + Status Tablet::calc_delete_bitmap_between_segments( RowsetSharedPtr rowset, const std::vector& segments, DeleteBitmapPtr delete_bitmap) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 3efd2c89ce19f8..0b0c20719cf349 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -72,6 +72,7 @@ class TabletMetaPB; class TupleDescriptor; class CalcDeleteBitmapToken; enum CompressKind : int; +class RowsetBinlogMetasPB; namespace io { class RemoteFileSystem; @@ -520,12 +521,16 @@ class Tablet : public BaseTablet { std::vector get_binlog_filepath(std::string_view binlog_version) const; std::pair get_binlog_info(std::string_view binlog_version) const; - std::string get_binlog_rowset_meta(std::string_view binlog_version, + std::string get_rowset_binlog_meta(std::string_view binlog_version, std::string_view rowset_id) const; + Status get_rowset_binlog_metas(const std::vector& binlog_versions, + RowsetBinlogMetasPB* metas_pb); std::string get_segment_filepath(std::string_view rowset_id, std::string_view segment_index) const; + std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const; bool can_add_binlog(uint64_t total_binlog_size) const; void gc_binlogs(int64_t version); + Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb); inline void increase_io_error_times() { ++_io_error_times; } diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 36d19cf8ede3a6..76a749c655ec39 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -44,7 +44,10 @@ #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/olap_meta.h" +#include "olap/pb_helper.h" #include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_meta_manager.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" @@ -871,12 +874,59 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id, return Status::Error( "fail to load tablet_meta. file_path={}", header_path); } + TabletUid tablet_uid = TabletUid::gen_uid(); + + // remove rowset binlog metas + auto binlog_metas_file = fmt::format("{}/rowset_binlog_metas.pb", schema_hash_path); + bool binlog_metas_file_exists = false; + auto file_exists_status = + io::global_local_filesystem()->exists(binlog_metas_file, &binlog_metas_file_exists); + if (!file_exists_status.ok()) { + return file_exists_status; + } + bool contain_binlog = false; + RowsetBinlogMetasPB rowset_binlog_metas_pb; + if (binlog_metas_file_exists) { + auto binlog_meta_filesize = std::filesystem::file_size(binlog_metas_file); + if (binlog_meta_filesize > 0) { + contain_binlog = true; + RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); + } + if (contain_binlog) { + auto binlog_dir = fmt::format("{}/_binlog", schema_hash_path); + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(binlog_dir)); + + std::vector files; + RETURN_IF_ERROR( + io::global_local_filesystem()->list(schema_hash_path, true, &files, &exists)); + for (auto& file : files) { + auto& filename = file.file_name; + if (!filename.ends_with(".binlog")) { + continue; + } + + // change clone_file suffix .binlog to .dat + std::string new_filename = filename; + new_filename.replace(filename.size() - 7, 7, ".dat"); + auto from = fmt::format("{}/{}", schema_hash_path, filename); + auto to = fmt::format("{}/_binlog/{}", schema_hash_path, new_filename); + RETURN_IF_ERROR(io::global_local_filesystem()->rename(from, to)); + } + + auto meta = store->get_meta(); + // if ingest binlog metas error, it will be gc in gc_unused_binlog_metas + RETURN_IF_ERROR( + RowsetMetaManager::ingest_binlog_metas(meta, tablet_uid, &rowset_binlog_metas_pb)); + } + // has to change shard id here, because meta file maybe copied from other source // its shard is different from local shard tablet_meta->set_shard_id(shard); // load dir is called by clone, restore, storage migration // should change tablet uid when tablet object changed - tablet_meta->set_tablet_uid(TabletUid::gen_uid()); + tablet_meta->set_tablet_uid(std::move(tablet_uid)); std::string meta_binary; tablet_meta->serialize(&meta_binary); RETURN_NOT_OK_STATUS_WITH_WARN( diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 1f1c04ec706547..19143e1079f24b 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -50,6 +50,7 @@ #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/olap_define.h" +#include "olap/pb_helper.h" #include "olap/rowset/rowset.h" #include "olap/snapshot_manager.h" #include "olap/storage_engine.h" @@ -354,6 +355,7 @@ Status EngineCloneTask::_make_snapshot(const std::string& ip, int port, TTableId request.__set_schema_hash(schema_hash); request.__set_preferred_snapshot_version(g_Types_constants.TPREFER_SNAPSHOT_REQ_VERSION); request.__set_version(_clone_req.committed_version); + request.__set_is_copy_binlog(true); // TODO: missing version composed of singleton delta. // if not, this place should be rewrote. // we make every TSnapshotRequest sent from be with __isset.missing_version = true @@ -537,6 +539,30 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d // remove the cloned meta file RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(cloned_tablet_meta_file)); + // remove rowset binlog metas + const auto& tablet_dir = tablet->tablet_path(); + auto binlog_metas_file = fmt::format("{}/rowset_binlog_metas.pb", clone_dir); + bool binlog_metas_file_exists = false; + auto file_exists_status = + io::global_local_filesystem()->exists(binlog_metas_file, &binlog_metas_file_exists); + if (!file_exists_status.ok()) { + return file_exists_status; + } + bool contain_binlog = false; + RowsetBinlogMetasPB rowset_binlog_metas_pb; + if (binlog_metas_file_exists) { + auto binlog_meta_filesize = std::filesystem::file_size(binlog_metas_file); + if (binlog_meta_filesize > 0) { + contain_binlog = true; + RETURN_IF_ERROR(read_pb(binlog_metas_file, &rowset_binlog_metas_pb)); + } + RETURN_IF_ERROR(io::global_local_filesystem()->delete_file(binlog_metas_file)); + } + if (contain_binlog) { + auto binlog_dir = fmt::format("{}/_binlog", tablet_dir); + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(binlog_dir)); + } + // check all files in /clone and /tablet std::vector clone_files; RETURN_IF_ERROR(io::global_local_filesystem()->list(clone_dir, true, &clone_files, &exists)); @@ -546,7 +572,6 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d } std::vector local_files; - const auto& tablet_dir = tablet->tablet_path(); RETURN_IF_ERROR(io::global_local_filesystem()->list(tablet_dir, true, &local_files, &exists)); std::unordered_set local_file_names; for (auto& file : local_files) { @@ -575,10 +600,28 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d } auto from = fmt::format("{}/{}", clone_dir, clone_file); - auto to = fmt::format("{}/{}", tablet_dir, clone_file); + std::string to; + if (clone_file.ends_with(".binlog")) { + if (!contain_binlog) { + LOG(WARNING) << "clone binlog file, but not contain binlog metas. " + << "tablet=" << tablet->full_name() << ", clone_file=" << clone_file; + break; + } + + // change clone_file suffix .binlog to .dat + std::string new_clone_file = clone_file; + new_clone_file.replace(clone_file.size() - 7, 7, ".dat"); + to = fmt::format("{}/_binlog/{}", tablet_dir, new_clone_file); + } else { + to = fmt::format("{}/{}", tablet_dir, clone_file); + } + RETURN_IF_ERROR(io::global_local_filesystem()->link_file(from, to)); linked_success_files.emplace_back(std::move(to)); } + if (contain_binlog) { + RETURN_IF_ERROR(tablet->ingest_binlog_metas(&rowset_binlog_metas_pb)); + } // clone and compaction operation should be performed sequentially std::lock_guard base_compaction_lock(tablet->get_base_compaction_lock()); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 7c0c83a4a09ee0..dc20228ffdb5f6 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -342,3 +342,17 @@ message PendingPublishInfoPB { optional int64 partition_id = 1; optional int64 transaction_id = 2; } + +message RowsetBinlogMetasPB { + message RowsetBinlogMetaPB { + optional string rowset_id = 1; + optional int64 version = 2; + optional int64 num_segments = 3; + optional string meta_key = 4; + optional bytes meta = 5; + optional string data_key = 6; + optional bytes data = 7; + } + + repeated RowsetBinlogMetaPB rowset_binlog_metas = 1; +} diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 0fec8c63999a35..18a473e2d4d2fd 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -338,6 +338,7 @@ struct TSnapshotRequest { 10: optional bool is_copy_tablet_task 11: optional Types.TVersion start_version 12: optional Types.TVersion end_version + 13: optional bool is_copy_binlog } struct TReleaseSnapshotRequest {