Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/olap/olap_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ enum OLAPStatus {
OLAP_ERR_META_PUT = -3004,
OLAP_ERR_META_ITERATOR = -3005,
OLAP_ERR_META_DELETE = -3006,
OLAP_ERR_META_ALREADY_EXIST = -3007,

// Rowset
// [-3100, -3200)
Expand Down
18 changes: 16 additions & 2 deletions be/src/olap/rowset/alpha_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ int64_t AlphaRowset::ref_count() const {
}

OLAPStatus AlphaRowset::make_snapshot(const std::string& snapshot_path,
std::vector<std::string>* success_files) {
std::vector<std::string>* success_links) {
for (auto& segment_group : _segment_groups) {
OLAPStatus status = segment_group->make_snapshot(snapshot_path, success_files);
OLAPStatus status = segment_group->make_snapshot(snapshot_path, success_links);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "create hard links failed for segment group:"
<< segment_group->segment_group_id();
Expand All @@ -219,6 +219,20 @@ OLAPStatus AlphaRowset::make_snapshot(const std::string& snapshot_path,
return OLAP_SUCCESS;
}

OLAPStatus AlphaRowset::copy_files_to_path(const std::string& dest_path,
std::vector<std::string>* success_files) {
for (auto& segment_group : _segment_groups) {
OLAPStatus status = segment_group->copy_files_to_path(dest_path, success_files);
if (status != OLAP_SUCCESS) {
LOG(WARNING) << "copy files failed for segment group."
<< " segment_group_id:" << segment_group->segment_group_id()
<< ", dest_path:" << dest_path;
return status;
}
}
return OLAP_SUCCESS;
}

OLAPStatus AlphaRowset::convert_from_old_files(const std::string& snapshot_path,
std::vector<std::string>* success_files) {
for (auto& segment_group : _segment_groups) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/alpha_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class AlphaRowset : public Rowset {
int64_t ref_count() const override;

OLAPStatus make_snapshot(const std::string& snapshot_path,
std::vector<std::string>* success_files) override;
std::vector<std::string>* success_links) override;
OLAPStatus copy_files_to_path(const std::string& dest_path,
std::vector<std::string>* success_files) override;

OLAPStatus convert_from_old_files(const std::string& snapshot_path,
std::vector<std::string>* success_files);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/alpha_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ OLAPStatus AlphaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
AlphaRowsetSharedPtr alpha_rowset = std::dynamic_pointer_cast<AlphaRowset>(rowset);
for (auto& segment_group : alpha_rowset->_segment_groups) {
RETURN_NOT_OK(_init());
RETURN_NOT_OK(segment_group->copy_segments_to_path(_rowset_writer_context.rowset_path_prefix,
RETURN_NOT_OK(segment_group->link_segments_to_path(_rowset_writer_context.rowset_path_prefix,
_rowset_writer_context.rowset_id));
_cur_segment_group->set_empty(segment_group->empty());
_cur_segment_group->set_num_segments(segment_group->num_segments());
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
virtual int64_t ref_count() const = 0;

virtual OLAPStatus make_snapshot(const std::string& snapshot_path,
std::vector<std::string>* success_files) = 0;
std::vector<std::string>* success_links) = 0;
virtual OLAPStatus copy_files_to_path(const std::string& dest_path,
std::vector<std::string>* success_files) = 0;

virtual OLAPStatus remove_old_files(std::vector<std::string>* files_to_remove) = 0;

Expand Down
39 changes: 38 additions & 1 deletion be/src/olap/rowset/segment_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,43 @@ OLAPStatus SegmentGroup::make_snapshot(const std::string& snapshot_path,
return OLAP_SUCCESS;
}

OLAPStatus SegmentGroup::copy_files_to_path(const std::string& dest_path,
std::vector<std::string>* success_files) {
if (_empty) {
return OLAP_SUCCESS;
}
for (int segment_id = 0; segment_id < _num_segments; segment_id++) {
std::string dest_data_file = construct_data_file_path(dest_path, segment_id);
if (check_dir_existed(dest_data_file)) {
LOG(WARNING) << "file already exists:" << dest_data_file;
return OLAP_ERR_FILE_ALREADY_EXIST;
}
std::string data_file_to_copy = construct_data_file_path(segment_id);
if (copy_file(data_file_to_copy, dest_data_file) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to copy data file. from=" << data_file_to_copy
<< ", to=" << dest_data_file
<< ", errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
}
success_files->push_back(dest_data_file);
std::string dest_index_file = construct_index_file_path(dest_path, segment_id);
if (check_dir_existed(dest_index_file)) {
LOG(WARNING) << "file already exists:" << dest_index_file;
return OLAP_ERR_FILE_ALREADY_EXIST;
}
std::string index_file_to_copy = construct_index_file_path(segment_id);
if (copy_file(index_file_to_copy, dest_index_file) != OLAP_SUCCESS) {
LOG(WARNING) << "fail to copy index file. from=" << index_file_to_copy
<< ", to=" << dest_index_file
<< ", errno=" << Errno::no();
return OLAP_ERR_OS_ERROR;
}
success_files->push_back(dest_index_file);
}
return OLAP_SUCCESS;
}


OLAPStatus SegmentGroup::convert_from_old_files(const std::string& snapshot_path,
std::vector<std::string>* success_links) {
if (_empty) {
Expand Down Expand Up @@ -828,7 +865,7 @@ OLAPStatus SegmentGroup::remove_old_files(std::vector<std::string>* links_to_rem
return OLAP_SUCCESS;
}

OLAPStatus SegmentGroup::copy_segments_to_path(const std::string& dest_path, int64_t rowset_id) {
OLAPStatus SegmentGroup::link_segments_to_path(const std::string& dest_path, int64_t rowset_id) {
if (dest_path.empty()) {
LOG(WARNING) << "dest path is empty, return error";
return OLAP_ERR_INPUT_PARAMETER_ERROR;
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,10 @@ class SegmentGroup {

OLAPStatus make_snapshot(const std::string& snapshot_path,
std::vector<std::string>* success_links);
OLAPStatus copy_files_to_path(const std::string& dest_path,
std::vector<std::string>* success_files);

OLAPStatus copy_segments_to_path(const std::string& dest_path, int64_t rowset_id);
OLAPStatus link_segments_to_path(const std::string& dest_path, int64_t rowset_id);

private:

Expand Down
12 changes: 0 additions & 12 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,18 +380,6 @@ inline size_t Tablet::row_size() const {
return _schema.row_size();
}

inline OLAPStatus Tablet::try_migration_rdlock() {
return _migration_lock.tryrdlock();
}

inline OLAPStatus Tablet::try_migration_wrlock() {
return _migration_lock.trywrlock();
}

inline void Tablet::release_migration_lock() {
_migration_lock.unlock();
}

}

#endif // DORIS_BE_SRC_OLAP_TABLET_H
29 changes: 19 additions & 10 deletions be/src/olap/task/engine_storage_migration_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate(
<< "schema_hash_path=" << schema_hash_path;
remove_all_dir(schema_hash_path);
}
TabletMetaSharedPtr new_tablet_meta(new(std::nothrow) TabletMeta());
res = TabletMetaManager::get_header(stores[0], tablet->tablet_id(), tablet->schema_hash(), new_tablet_meta);
if (res != OLAP_ERR_META_KEY_NOT_FOUND) {
LOG(WARNING) << "tablet_meta already exists. "
<< "data_dir:" << stores[0]->path()
<< "tablet:" << tablet->full_name();
return OLAP_ERR_META_ALREADY_EXIST;
}
create_dirs(schema_hash_path);

// migrate all index and data files but header file
Expand All @@ -127,12 +135,6 @@ OLAPStatus EngineStorageMigrationTask::_storage_medium_migrate(
break;
}

// generate new header file from the old
TabletMetaSharedPtr new_tablet_meta(new(std::nothrow) TabletMeta());
if (new_tablet_meta == nullptr) {
LOG(WARNING) << "new olap header failed";
return OLAP_ERR_BUFFER_OVERFLOW;
}
res = _generate_new_header(stores[0], shard, tablet, consistent_rowsets, new_tablet_meta);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to generate new header file from the old. res=" << res;
Expand Down Expand Up @@ -219,12 +221,19 @@ OLAPStatus EngineStorageMigrationTask::_copy_index_and_data_files(
const string& schema_hash_path,
const TabletSharedPtr& ref_tablet,
std::vector<RowsetSharedPtr>& consistent_rowsets) {
// TODO(lcy). copy function should be implemented
std::vector<std::string> success_files;
OLAPStatus status = OLAP_SUCCESS;
for (auto& rs : consistent_rowsets) {
std::vector<std::string> success_files;
RETURN_NOT_OK(rs->make_snapshot(schema_hash_path, &success_files));
status = rs->copy_files_to_path(schema_hash_path, &success_files);
if (status != OLAP_SUCCESS) {
if (remove_all_dir(schema_hash_path) != OLAP_SUCCESS) {
LOG(FATAL) << "remove storage migration path failed. "
<< "schema_hash_path:" << schema_hash_path;
}
break;
}
}
return OLAP_SUCCESS;
return status;
}

} // doris