Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,15 @@ void AgentServer::make_snapshot(TAgentResult& return_value,
TStatus status;
vector<string> error_msgs;
TStatusCode::type status_code = TStatusCode::OK;
return_value.__set_snapshot_version(PREFERRED_SNAPSHOT_VERSION);
int32_t return_snapshot_version = PREFERRED_SNAPSHOT_VERSION;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comment here to describe the version meaning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// if the request's snapshot version is less than current be's snapshot version
// it means the request be is under old version. just set the request version to 1
// current be will generate snapshot files like tabletid_schemahash_startversion_endversion
// format. Every be is able to parse this format snapshot files.
if (snapshot_request.preferred_snapshot_version < PREFERRED_SNAPSHOT_VERSION) {
return_snapshot_version = 1;
}
return_value.__set_snapshot_version(return_snapshot_version);
string snapshot_path;
OLAPStatus make_snapshot_status =
SnapshotManager::instance()->make_snapshot(snapshot_request, &snapshot_path);
Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Compaction::Compaction(TabletSharedPtr tablet)
_state(CompactionState::INITED)
{ }

Compaction::~Compaction() { }
Compaction::~Compaction() {}

OLAPStatus Compaction::do_compaction() {
LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name();
Expand Down Expand Up @@ -83,8 +83,8 @@ OLAPStatus Compaction::do_compaction() {
}

OLAPStatus Compaction::construct_output_rowset_writer() {
RowsetId rowset_id = 0;
RETURN_NOT_OK(_tablet->next_rowset_id(&rowset_id));
RowsetId rowset_id;
RETURN_NOT_OK(StorageEngine::instance()->next_rowset_id(&rowset_id));
RowsetWriterContext context;
context.rowset_id = rowset_id;
context.tablet_uid = _tablet->tablet_uid();
Expand Down
22 changes: 10 additions & 12 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,6 @@ Status DataDir::init() {
RETURN_IF_ERROR(_init_file_system());
RETURN_IF_ERROR(_init_meta());

_id_generator = new RowsetIdGenerator(_meta);
auto res = _id_generator->init();
if (res != OLAP_SUCCESS) {
return Status::InternalError("Id generator initialized failed.");
}

_is_used = true;
return Status::OK();
}
Expand Down Expand Up @@ -582,7 +576,9 @@ OLAPStatus DataDir::_convert_old_tablet() {
for (auto& rowset_pb : pending_rowsets) {
string meta_binary;
rowset_pb.SerializeToString(&meta_binary);
status = RowsetMetaManager::save(_meta, rowset_pb.tablet_uid(), rowset_pb.rowset_id() , meta_binary);
RowsetId rowset_id;
rowset_id.init(rowset_pb.rowset_id_v2());
status = RowsetMetaManager::save(_meta, rowset_pb.tablet_uid(), rowset_id, meta_binary);
if (status != OLAP_SUCCESS) {
LOG(FATAL) << "convert olap header to tablet meta failed when save rowset meta tablet="
<< tablet_id << "." << schema_hash;
Expand Down Expand Up @@ -903,11 +899,12 @@ void DataDir::perform_path_gc() {
}
} else {
bool valid = tablet->check_path(path);
// TODO(ygl): should change a method to do gc
if (!valid) {
RowsetId rowset_id = -1;
RowsetId rowset_id;
bool is_rowset_file = _tablet_manager->get_rowset_id_from_path(path, &rowset_id);
if (is_rowset_file) {
std::string rowset_path_id = ROWSET_ID_PREFIX + std::to_string(rowset_id);
std::string rowset_path_id = ROWSET_ID_PREFIX + rowset_id.to_string();
bool exist_in_pending = _check_pending_ids(rowset_path_id);
if (!exist_in_pending) {
_process_garbage_path(path);
Expand Down Expand Up @@ -959,18 +956,19 @@ void DataDir::perform_path_gc_by_rowsetid() {
// tablet schema hash path or rowset file path
// gc thread should get tablet include deleted tablet
// or it will delete rowset file before tablet is garbage collected
RowsetId rowset_id = -1;
RowsetId rowset_id;
bool is_rowset_file = _tablet_manager->get_rowset_id_from_path(path, &rowset_id);
if (is_rowset_file) {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id, schema_hash);
if (tablet != nullptr) {
bool valid = tablet->check_rowset_id(rowset_id);
if (!valid) {
// if the rowset id is less than tablet's initial end rowset id
// if the rowset id is in using rowset set
// and the rowsetid is not in unused_rowsets
// and the rowsetid is not in committed rowsets
// then delete the path.
if (rowset_id < tablet->initial_end_rowset_id()
// TODO(ygl): check rowset id
if (!StorageEngine::instance()->rowset_id_in_use(rowset_id)
&& !StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)
&& !RowsetMetaManager::check_rowset_meta(_meta, tablet->tablet_uid(), rowset_id)) {
_process_garbage_path(path);
Expand Down
10 changes: 1 addition & 9 deletions be/src/olap/data_dir.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,7 @@ class DataDir {
void find_tablet_in_trash(int64_t tablet_id, std::vector<std::string>* paths);

static std::string get_root_path_from_schema_hash_path_in_trash(const std::string& schema_hash_dir_in_trash);
/*
OLAPStatus next_id(RowsetId* id) {
return _id_generator->get_next_id(id);
}

OLAPStatus set_next_id(RowsetId new_rowset_id) {
return _id_generator->set_next_id(new_rowset_id);
}
*/

// load data from meta and data files
OLAPStatus load();

Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ DeltaWriter::~DeltaWriter() {
SAFE_DELETE(_mem_table);
SAFE_DELETE(_schema);
if (_rowset_writer != nullptr) {
_rowset_writer->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + std::to_string(_rowset_writer->rowset_id()));
_rowset_writer->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX + _rowset_writer->rowset_id().to_string());
}
}

Expand Down Expand Up @@ -115,8 +115,8 @@ OLAPStatus DeltaWriter::init() {
}
}

RowsetId rowset_id = 0; // get rowset_id from id generator
OLAPStatus status = _tablet->next_rowset_id(&rowset_id);
RowsetId rowset_id; // get rowset_id from id generator
OLAPStatus status = StorageEngine::instance()->next_rowset_id(&rowset_id);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "generate rowset id failed, status:" << status;
return OLAP_ERR_ROWSET_GENERATE_ID_FAILED;
Expand Down
84 changes: 83 additions & 1 deletion be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <list>
#include <map>
#include <memory>
#include <ostream>
#include <sstream>
#include <string>
#include <typeinfo>
Expand All @@ -35,8 +36,12 @@
#include "util/hash_util.hpp"
#include "util/uid_util.h"

#define LOW_56_BITS 0x00ffffffffffffff

namespace doris {

static const int64_t MAX_ROWSET_ID = 1L << 56;

typedef int32_t SchemaHash;
typedef int64_t VersionHash;
typedef __int128 int128_t;
Expand Down Expand Up @@ -241,7 +246,84 @@ typedef std::set<uint32_t> UniqueIdSet;
// Column unique Id -> column id map
typedef std::map<ColumnId, ColumnId> UniqueIdToColumnIdMap;

typedef int64_t RowsetId;
// 128 bit backend uid, it is a uuid bit, id version
// 8 bit rowset id version
// 56 bit, inc number from 0
struct RowsetId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can RowsetId support ++ operator?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can directly use protobuf to do serialize and deserialize?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is useless using ++ , because it will need to get a rowsetidgenerator object.
Not use pb as a serialize method, we use string.

int8_t version = 0;
int64_t hi = 0;
int64_t mi = 0;
int64_t lo = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint64_t?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UniqueId generator using int64_t so that I use int64_t here


void init(const std::string& rowset_id_str) {
// for new rowsetid its a 48 hex string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// for new rowsetid its a 48 hex string
// for new rowsetid its a 48 bytes hex string

// if the len < 48, then it is an old format rowset id
if (rowset_id_str.length() < 48) {
int64_t low = std::stol(rowset_id_str, nullptr, 10);
init(1, 0, 0, low);
} else {
int64_t high = 0;
int64_t middle = 0;
int64_t low = 0;
from_hex(&high, rowset_id_str.substr(0, 16));
from_hex(&middle, rowset_id_str.substr(16, 16));
from_hex(&low, rowset_id_str.substr(32, 16));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from_hex(&low, rowset_id_str.substr(32, 16));
from_hex(&_hi, rowset_id_str.substr(0, 16));
from_hex(&_mi, rowset_id_str.substr(16, 16));
from_hex(&_lo, rowset_id_str.substr(32, 16));

init(low >> 56, high, middle, low & LOW_56_BITS);
}
}

// to compatiable with old version
void init(int64_t rowset_id) {
init(1, 0, 0, rowset_id);
}

void init(int64_t id_version, int64_t high, int64_t middle, int64_t low) {
version = id_version;
if (low >= MAX_ROWSET_ID) {
LOG(FATAL) << "low is too large:" << low;
}
hi = high;
mi = middle;
lo = (id_version << 56) + (low & LOW_56_BITS);
}

std::string to_string() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to_string and serialize and deserialize api

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_string is ok, because it is a string value in pb.

if (version < 2) {
return std::to_string(lo & LOW_56_BITS);
} else {
char buf[48];
to_hex(hi, buf);
to_hex(mi, buf + 16);
to_hex(lo, buf + 32);
return {buf, 48};
}
}

// std::unordered_map need this api
bool operator==(const RowsetId& rhs) const {
return lo == rhs.lo && hi == rhs.hi && mi == rhs.mi ;
}

bool operator!=(const RowsetId& rhs) const {
return lo != rhs.lo || hi != rhs.hi || mi != rhs.mi ;
}

bool operator<(const RowsetId& rhs) const {
if (hi != rhs.hi) {
return hi < rhs.hi;
} else if (mi != rhs.mi) {
return mi < rhs.mi;
} else {
return lo < rhs.lo;
}
}

friend std::ostream& operator<<(std::ostream& out, const RowsetId& rowset_id) {
out << rowset_id.to_string();
return out;
}

};

} // namespace doris

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static constexpr uint32_t OLAP_COMPACTION_DEFAULT_CANDIDATE_SIZE = 10;
// the max length supported for string type
static const uint16_t OLAP_STRING_MAX_LENGTH = 65535;

static const int32_t PREFERRED_SNAPSHOT_VERSION = 2;
static const int32_t PREFERRED_SNAPSHOT_VERSION = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comment to explain the version 1 , 2, and 3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// the max bytes for stored string length
using StringOffsetType = uint32_t;
Expand Down
24 changes: 15 additions & 9 deletions be/src/olap/olap_snapshot_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ OLAPStatus OlapSnapshotConverter::to_tablet_meta_pb(const OLAPHeaderMessage& ola
schema->set_next_column_unique_id(olap_header.next_column_unique_id());
}

RowsetId next_id = 10000;
std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> _rs_version_map;
for (auto& delta : olap_header.delta()) {
RowsetId next_id;
RETURN_NOT_OK(StorageEngine::instance()->next_rowset_id(&next_id));
RowsetMetaPB* rowset_meta = tablet_meta_pb->add_rs_metas();
convert_to_rowset_meta(delta, ++next_id, olap_header.tablet_id(), olap_header.schema_hash(), rowset_meta);
convert_to_rowset_meta(delta, next_id, olap_header.tablet_id(), olap_header.schema_hash(), rowset_meta);
Version rowset_version = { delta.start_version(), delta.end_version() };
_rs_version_map[rowset_version] = rowset_meta;
}
Expand All @@ -137,13 +138,17 @@ OLAPStatus OlapSnapshotConverter::to_tablet_meta_pb(const OLAPHeaderMessage& ola
*rowset_meta = *(exist_rs->second);
continue;
}
RowsetId next_id;
RETURN_NOT_OK(StorageEngine::instance()->next_rowset_id(&next_id));
RowsetMetaPB* rowset_meta = tablet_meta_pb->add_inc_rs_metas();
convert_to_rowset_meta(inc_delta, ++next_id, olap_header.tablet_id(), olap_header.schema_hash(), rowset_meta);
convert_to_rowset_meta(inc_delta, next_id, olap_header.tablet_id(), olap_header.schema_hash(), rowset_meta);
}

for (auto& pending_delta : olap_header.pending_delta()) {
RowsetId next_id;
RETURN_NOT_OK(StorageEngine::instance()->next_rowset_id(&next_id));
RowsetMetaPB rowset_meta;
convert_to_rowset_meta(pending_delta, ++next_id, olap_header.tablet_id(), olap_header.schema_hash(), &rowset_meta);
convert_to_rowset_meta(pending_delta, next_id, olap_header.tablet_id(), olap_header.schema_hash(), &rowset_meta);
pending_rowsets->emplace_back(std::move(rowset_meta));
}
if (olap_header.has_schema_change_status()) {
Expand All @@ -155,7 +160,6 @@ OLAPStatus OlapSnapshotConverter::to_tablet_meta_pb(const OLAPHeaderMessage& ola
}
tablet_meta_pb->set_tablet_state(TabletStatePB::PB_RUNNING);
*(tablet_meta_pb->mutable_tablet_uid()) = TabletUid().to_proto();
tablet_meta_pb->set_end_rowset_id(++next_id);
VLOG(3) << "convert tablet meta tablet id = " << olap_header.tablet_id()
<< " schema hash = " << olap_header.schema_hash() << " successfully.";
return OLAP_SUCCESS;
Expand Down Expand Up @@ -196,8 +200,9 @@ OLAPStatus OlapSnapshotConverter::convert_to_pdelta(const RowsetMetaPB& rowset_m
}

OLAPStatus OlapSnapshotConverter::convert_to_rowset_meta(const PDelta& delta,
int64_t rowset_id, int64_t tablet_id, int32_t schema_hash, RowsetMetaPB* rowset_meta_pb) {
rowset_meta_pb->set_rowset_id(rowset_id);
const RowsetId& rowset_id, int64_t tablet_id, int32_t schema_hash, RowsetMetaPB* rowset_meta_pb) {
rowset_meta_pb->set_rowset_id(0);
rowset_meta_pb->set_rowset_id_v2(rowset_id.to_string());
rowset_meta_pb->set_tablet_id(tablet_id);
rowset_meta_pb->set_tablet_schema_hash(schema_hash);
rowset_meta_pb->set_rowset_type(RowsetTypePB::ALPHA_ROWSET);
Expand Down Expand Up @@ -243,8 +248,9 @@ OLAPStatus OlapSnapshotConverter::convert_to_rowset_meta(const PDelta& delta,
}

OLAPStatus OlapSnapshotConverter::convert_to_rowset_meta(const PPendingDelta& pending_delta,
int64_t rowset_id, int64_t tablet_id, int32_t schema_hash, RowsetMetaPB* rowset_meta_pb) {
rowset_meta_pb->set_rowset_id(rowset_id);
const RowsetId& rowset_id, int64_t tablet_id, int32_t schema_hash, RowsetMetaPB* rowset_meta_pb) {
rowset_meta_pb->set_rowset_id(0);
rowset_meta_pb->set_rowset_id_v2(rowset_id.to_string());
rowset_meta_pb->set_tablet_id(tablet_id);
rowset_meta_pb->set_tablet_schema_hash(schema_hash);
rowset_meta_pb->set_rowset_type(RowsetTypePB::ALPHA_ROWSET);
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/olap_snapshot_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ class OlapSnapshotConverter {

OLAPStatus convert_to_pdelta(const RowsetMetaPB& rowset_meta_pb, PDelta* delta);

OLAPStatus convert_to_rowset_meta(const PDelta& delta, int64_t rowset_id, int64_t tablet_id,
OLAPStatus convert_to_rowset_meta(const PDelta& delta, const RowsetId& rowset_id, int64_t tablet_id,
int32_t schema_hash, RowsetMetaPB* rowset_meta_pb);

OLAPStatus convert_to_rowset_meta(const PPendingDelta& pending_delta, int64_t rowset_id,
OLAPStatus convert_to_rowset_meta(const PPendingDelta& pending_delta, const RowsetId& rowset_id,
int64_t tablet_id, int32_t schema_hash, RowsetMetaPB* rowset_meta_pb);

OLAPStatus to_column_pb(const ColumnMessage& column_msg, ColumnPB* column_pb);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "olap/olap_define.h"
#include "util/uid_util.h"

namespace doris {

Expand All @@ -37,6 +38,7 @@ OLAPStatus parse_conf_store_paths(const std::string& config_path, std::vector<St
struct EngineOptions {
// list paths that tablet will be put into.
std::vector<StorePath> store_paths;
UniqueId backend_uid;
};

}
8 changes: 2 additions & 6 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,8 @@ OLAPStatus PushHandler::_convert(TabletSharedPtr cur_tablet,
}
RowsetWriterContext context;
uint32_t num_rows = 0;
RowsetId rowset_id = 0;
res = cur_tablet->next_rowset_id(&rowset_id);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "generate rowset id failed, res:" << res;
return OLAP_ERR_ROWSET_GENERATE_ID_FAILED;
}
RowsetId rowset_id;
RETURN_NOT_OK(StorageEngine::instance()->next_rowset_id(&rowset_id));
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ add_library(Rowset STATIC
run_length_byte_writer.cpp
run_length_integer_reader.cpp
run_length_integer_writer.cpp
unique_rowset_id_generator.cpp
segment_reader.cpp
segment_writer.cpp
rowset.cpp
rowset_id_generator.cpp
rowset_meta_manager.cpp
alpha_rowset.cpp
alpha_rowset_reader.cpp
alpha_rowset_writer.cpp
alpha_rowset_meta.cpp
beta_rowset.cpp
beta_rowset_writer.cpp
rowset_id_generator.cpp)
beta_rowset_writer.cpp)
5 changes: 5 additions & 0 deletions be/src/olap/rowset/alpha_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ OLAPStatus AlphaRowsetWriter::flush() {
}

RowsetSharedPtr AlphaRowsetWriter::build() {
if (_current_rowset_meta->rowset_id().version == 0) {
LOG(WARNING) << "invalid rowset id, version == 0, rowset id="
<< _current_rowset_meta->rowset_id().to_string();
return nullptr;
}
if (_writer_state != WRITER_FLUSHED) {
LOG(WARNING) << "invalid writer state before build, state:" << _writer_state;
return nullptr;
Expand Down
Loading