Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/http/action/download_binlog_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ void handle_get_rowset_meta(HttpRequest* req) {
auto tablet = get_tablet(tablet_id);
const auto& rowset_id = get_http_param(req, kRowsetIdParameter);
const auto& binlog_version = get_http_param(req, kBinlogVersionParameter);
auto rowset_meta = tablet->get_binlog_rowset_meta(binlog_version, rowset_id);
auto rowset_meta = tablet->get_rowset_binlog_meta(binlog_version, rowset_id);
if (rowset_meta.empty()) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND,
fmt::format("get rowset meta failed, rowset_id={}", rowset_id));
Expand Down
41 changes: 25 additions & 16 deletions be/src/olap/binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ constexpr std::string_view kBinlogPrefix = "binlog_";
constexpr std::string_view kBinlogMetaPrefix = "binlog_meta_";
constexpr std::string_view kBinlogDataPrefix = "binlog_data_";

inline auto make_binlog_meta_key(std::string_view tablet, int64_t version,
std::string_view rowset) {
inline auto make_binlog_meta_key(const std::string_view tablet, int64_t version,
const std::string_view rowset) {
return fmt::format("{}meta_{}_{:020d}_{}", kBinlogPrefix, tablet, version, rowset);
}

inline auto make_binlog_meta_key(std::string_view tablet, std::string_view version_str,
std::string_view rowset) {
inline auto make_binlog_meta_key(const std::string_view tablet, const std::string_view version_str,
const std::string_view rowset) {
// TODO(Drogon): use fmt::format not convert to version_num, only string with length prefix '0'
int64_t version = std::atoll(version_str.data());
return make_binlog_meta_key(tablet, version, rowset);
Expand All @@ -46,13 +46,21 @@ inline auto make_binlog_meta_key(const TabletUid& tablet_uid, int64_t version,
return make_binlog_meta_key(tablet_uid.to_string(), version, rowset_id.to_string());
}

inline auto make_binlog_data_key(std::string_view tablet, int64_t version,
std::string_view rowset) {
inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid) {
return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_uid.to_string());
}

inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid, int64_t version) {
return fmt::format("{}meta_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version);
}

inline auto make_binlog_data_key(const std::string_view tablet, int64_t version,
const std::string_view rowset) {
return fmt::format("{}data_{}_{:020d}_{}", kBinlogPrefix, tablet, version, rowset);
}

inline auto make_binlog_data_key(std::string_view tablet, std::string_view version,
std::string_view rowset) {
inline auto make_binlog_data_key(const std::string_view tablet, const std::string_view version,
const std::string_view rowset) {
return fmt::format("{}data_{}_{:0>20}_{}", kBinlogPrefix, tablet, version, rowset);
}

Expand All @@ -61,19 +69,20 @@ inline auto make_binlog_data_key(const TabletUid& tablet_uid, int64_t version,
return make_binlog_data_key(tablet_uid.to_string(), version, rowset_id.to_string());
}

inline auto make_binlog_filename_key(const TabletUid& tablet_uid, std::string_view version) {
return fmt::format("{}meta_{}_{:0>20}_", kBinlogPrefix, tablet_uid.to_string(), version);
inline auto make_binlog_data_key(const TabletUid& tablet_uid, int64_t version,
const std::string_view rowset_id) {
return make_binlog_data_key(tablet_uid.to_string(), version, rowset_id);
}

inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid) {
return fmt::format("{}meta_{}_", kBinlogPrefix, tablet_uid.to_string());
inline auto make_binlog_data_key_prefix(const TabletUid& tablet_uid, int64_t version) {
return fmt::format("{}data_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version);
}

inline auto make_binlog_meta_key_prefix(const TabletUid& tablet_uid, int64_t version) {
return fmt::format("{}meta_{}_{:020d}_", kBinlogPrefix, tablet_uid.to_string(), version);
inline auto make_binlog_filename_key(const TabletUid& tablet_uid, const std::string_view version) {
return fmt::format("{}meta_{}_{:0>20}_", kBinlogPrefix, tablet_uid.to_string(), version);
}

inline bool starts_with_binlog_meta(std::string_view str) {
inline bool starts_with_binlog_meta(const std::string_view str) {
auto prefix = kBinlogMetaPrefix;
if (prefix.length() > str.length()) {
return false;
Expand All @@ -82,7 +91,7 @@ inline bool starts_with_binlog_meta(std::string_view str) {
return str.compare(0, prefix.length(), prefix) == 0;
}

inline std::string get_binlog_data_key_from_meta_key(std::string_view meta_key) {
inline std::string get_binlog_data_key_from_meta_key(const std::string_view meta_key) {
// like "binlog_meta_6943f1585fe834b5-e542c2b83a21d0b7" => "binlog_data-6943f1585fe834b5-e542c2b83a21d0b7"
return fmt::format("{}data_{}", kBinlogPrefix, meta_key.substr(kBinlogMetaPrefix.length()));
}
Expand Down
73 changes: 73 additions & 0 deletions be/src/olap/pb_helper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <fstream>

#include "common/status.h"

namespace doris {
template <typename PB>
Status read_pb(const std::string& pb_filename, PB* pb) {
std::fstream pb_file_stream(pb_filename, std::ios::in | std::ios::binary);
if (pb_file_stream.bad()) {
auto err_msg = fmt::format("fail to open rowset binlog metas file. [path={}]", pb_filename);
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}

if (pb->ParseFromIstream(&pb_file_stream)) {
return Status::OK();
}
return Status::InternalError("fail to parse rowset binlog metas file");
}

template <typename PB>
Status write_pb(const std::string& pb_filename, const PB& pb) {
std::fstream pb_file_stream(pb_filename, std::ios::out | std::ios::trunc | std::ios::binary);
if (pb_file_stream.bad()) {
auto err_msg = fmt::format("fail to open rowset binlog metas file. [path={}]", pb_filename);
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}

if (!pb.SerializeToOstream(&pb_file_stream)) {
auto err_msg =
fmt::format("fail to save rowset binlog metas to file. [path={}]", pb_filename);
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}

if (!pb_file_stream.flush()) {
auto err_msg =
fmt::format("fail to flush rowset binlog metas to file. [path={}]", pb_filename);
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}

pb_file_stream.close();
if (pb_file_stream.bad()) {
auto err_msg =
fmt::format("fail to close rowset binlog metas file. [path={}]", pb_filename);
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}

return Status::OK();
}
} // namespace doris
152 changes: 151 additions & 1 deletion be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ std::pair<std::string, int64_t> RowsetMetaManager::get_binlog_info(
return std::make_pair(rowset_id, num_segments);
}

std::string RowsetMetaManager::get_binlog_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
std::string RowsetMetaManager::get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid,
std::string_view binlog_version,
std::string_view rowset_id) {
auto binlog_data_key = make_binlog_data_key(tablet_uid.to_string(), binlog_version, rowset_id);
Expand All @@ -275,6 +275,135 @@ std::string RowsetMetaManager::get_binlog_rowset_meta(OlapMeta* meta, TabletUid
return binlog_meta_value;
}

Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb) {
if (binlog_versions.empty()) {
return _get_all_rowset_binlog_metas(meta, tablet_uid, metas_pb);
} else {
return _get_rowset_binlog_metas(meta, tablet_uid, binlog_versions, metas_pb);
}
}

Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb) {
Status status;
auto tablet_uid_str = tablet_uid.to_string();
auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str](
const std::string& key, const std::string& value) -> bool {
VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
if (!starts_with_binlog_meta(key)) {
auto err_msg = fmt::format("invalid binlog meta key:{}", key);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}

BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromString(value)) {
auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}
auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();

auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
binlog_meta_pb->set_rowset_id(rowset_id);
binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
binlog_meta_pb->set_meta_key(key);
binlog_meta_pb->set_meta(value);

auto binlog_data_key =
make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
std::string binlog_data;
status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
if (!status.OK()) {
LOG(WARNING) << status.to_string();
return false;
}
binlog_meta_pb->set_data_key(binlog_data_key);
binlog_meta_pb->set_data(binlog_data);

return false;
};

for (auto& binlog_version : binlog_versions) {
auto prefix_key = make_binlog_meta_key_prefix(tablet_uid, binlog_version);
Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
if (!iterStatus.ok()) {
LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}",
prefix_key, iterStatus.to_string());
return iterStatus;
}
if (!status.ok()) {
return status;
}
}
return status;
}

Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb) {
Status status;
auto tablet_uid_str = tablet_uid.to_string();
int64_t tablet_id = 0;
auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &tablet_id](
const std::string& key, const std::string& value) -> bool {
VLOG_DEBUG << fmt::format("key:{}, value:{}", key, value);
if (!starts_with_binlog_meta(key)) {
LOG(INFO) << fmt::format("end scan binlog meta. key:{}", key);
return false;
}

BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromString(value)) {
auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}
if (tablet_id == 0) {
tablet_id = binlog_meta_entry_pb.tablet_id();
} else if (tablet_id != binlog_meta_entry_pb.tablet_id()) {
// scan all binlog meta, so tablet_id should be same:
return false;
}
auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();

auto binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
binlog_meta_pb->set_rowset_id(rowset_id);
binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
binlog_meta_pb->set_meta_key(key);
binlog_meta_pb->set_meta(value);

auto binlog_data_key =
make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
std::string binlog_data;
status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
if (!status.OK()) {
LOG(WARNING) << status.to_string();
return false;
}
binlog_meta_pb->set_data_key(binlog_data_key);
binlog_meta_pb->set_data(binlog_data);

return true;
};

auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
Status iterStatus = meta->iterate(META_COLUMN_FAMILY_INDEX, prefix_key, traverse_func);
if (!iterStatus.ok()) {
LOG(WARNING) << fmt::format("fail to iterate binlog meta. prefix_key:{}, status:{}",
prefix_key, iterStatus.to_string());
return iterStatus;
}
return status;
}

Status RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) {
std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
VLOG_NOTICE << "start to remove rowset, key:" << key;
Expand All @@ -288,6 +417,27 @@ Status RowsetMetaManager::remove_binlog(OlapMeta* meta, const std::string& suffi
{kBinlogMetaPrefix.data() + suffix, kBinlogDataPrefix.data() + suffix});
}

Status RowsetMetaManager::ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb) {
std::vector<OlapMeta::BatchEntry> entries;
const auto tablet_uid_str = tablet_uid.to_string();

for (auto& rowset_binlog_meta : *metas_pb->mutable_rowset_binlog_metas()) {
auto& rowset_id = rowset_binlog_meta.rowset_id();
auto version = rowset_binlog_meta.version();

auto meta_key = rowset_binlog_meta.mutable_meta_key();
*meta_key = make_binlog_meta_key(tablet_uid_str, version, rowset_id);
auto data_key = rowset_binlog_meta.mutable_data_key();
*data_key = make_binlog_data_key(tablet_uid_str, version, rowset_id);

entries.emplace_back(*meta_key, rowset_binlog_meta.meta());
entries.emplace_back(*data_key, rowset_binlog_meta.data());
}

return meta->put(META_COLUMN_FAMILY_INDEX, entries);
}

Status RowsetMetaManager::traverse_rowset_metas(
OlapMeta* meta,
std::function<bool(const TabletUid&, const RowsetId&, const std::string&)> const& func) {
Expand Down
20 changes: 14 additions & 6 deletions be/src/olap/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,39 @@ class RowsetMetaManager {
const RowsetMetaPB& rowset_meta_pb, bool enable_binlog);
static Status save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb);

static std::vector<std::string> get_binlog_filenames(OlapMeta* meta, TabletUid tablet_uid,
std::string_view binlog_version,
int64_t segment_idx);
static std::pair<std::string, int64_t> get_binlog_info(OlapMeta* meta, TabletUid tablet_uid,
std::string_view binlog_version);
static std::string get_binlog_rowset_meta(OlapMeta* meta, TabletUid tablet_uid,
static std::string get_rowset_binlog_meta(OlapMeta* meta, TabletUid tablet_uid,
std::string_view binlog_version,
std::string_view rowset_id);

static Status remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id);

static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
static Status remove_binlog(OlapMeta* meta, const std::string& suffix);

static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb);
static Status traverse_rowset_metas(OlapMeta* meta,
std::function<bool(const TabletUid&, const RowsetId&,
const std::string&)> const& collector);

static Status traverse_binlog_metas(
OlapMeta* meta, std::function<bool(const string&, const string&, bool)> const& func);

static Status remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id);

static Status load_json_rowset_meta(OlapMeta* meta, const std::string& rowset_meta_path);

private:
static Status _save_with_binlog(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id,
const RowsetMetaPB& rowset_meta_pb);
static Status _get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
static Status _get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb);
};

} // namespace doris
Expand Down
Loading