Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_compaction.h"
#include "olap/rowset/segment_v2/inverted_index_compound_directory.h"
#include "olap/storage_engine.h"
#include "olap/storage_policy.h"
#include "olap/tablet.h"
Expand Down Expand Up @@ -546,13 +547,18 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(src_rs);
if (rowset == nullptr) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id()
<< "] rowset is null, will skip index compaction";
return false;
}
auto fs = rowset->rowset_meta()->fs();

auto index_meta =
rowset->tablet_schema()->get_inverted_index(unique_id);
if (index_meta == nullptr) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id()
<< "] index_unique_id[" << unique_id
<< "] index meta is null, will skip index compaction";
return false;
}
for (auto i = 0; i < rowset->num_segments(); i++) {
Expand All @@ -567,10 +573,48 @@ Status Compaction::construct_output_rowset_writer(RowsetWriterContext& ctx, bool
return false;
}
if (!exists) {
LOG(WARNING) << inverted_index_src_file_path
LOG(WARNING) << "tablet[" << _tablet->tablet_id()
<< "] index_unique_id[" << unique_id << "],"
<< inverted_index_src_file_path
<< " is not exists, will skip index compaction";
return false;
}

// check idx file size
int64_t file_size = 0;
if (fs->file_size(inverted_index_src_file_path, &file_size) !=
Status::OK()) {
LOG(ERROR) << inverted_index_src_file_path
<< " fs->file_size error";
return false;
}
if (file_size == 0) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id()
<< "] index_unique_id[" << unique_id << "],"
<< inverted_index_src_file_path
<< " is empty file, will skip index compaction";
return false;
}

// check index meta
std::filesystem::path p(inverted_index_src_file_path);
std::string dir_str = p.parent_path().string();
std::string file_str = p.filename().string();
lucene::store::Directory* dir =
DorisCompoundDirectory::getDirectory(fs, dir_str.c_str());
auto reader = new DorisCompoundReader(dir, file_str.c_str());
std::vector<std::string> files;
reader->list(&files);

// why is 3?
// bkd index will write at least 3 files
if (files.size() < 3) {
LOG(WARNING) << "tablet[" << _tablet->tablet_id()
<< "] index_unique_id[" << unique_id << "],"
<< inverted_index_src_file_path
<< " is corrupted, will skip index compaction";
return false;
}
}
return true;
});
Expand Down