Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,25 @@ Status CompactionMixin::do_compact_ordered_rowsets() {

void CompactionMixin::build_basic_info() {
for (auto& rowset : _input_rowsets) {
_input_rowsets_data_size += rowset->data_disk_size();
_input_rowsets_index_size += rowset->index_disk_size();
_input_rowsets_total_size += rowset->total_disk_size();
const auto& rowset_meta = rowset->rowset_meta();
auto index_size = rowset_meta->index_disk_size();
auto total_size = rowset_meta->total_disk_size();
auto data_size = rowset_meta->data_disk_size();
// corrupted index size caused by bug before 2.1.5 or 3.0.0 version
// try to get real index size from disk.
if (index_size < 0 || index_size > total_size * 2) {
LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
<< " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
<< " rowset:" << rowset_meta->rowset_id();
index_size = 0;
auto st = rowset->get_inverted_index_size(&index_size);
if (!st.ok()) {
LOG(ERROR) << "failed to get inverted index size. res=" << st;
}
}
_input_rowsets_data_size += data_size;
_input_rowsets_index_size += index_size;
_input_rowsets_total_size += total_size;
_input_row_num += rowset->num_rows();
_input_num_segments += rowset->num_segments();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Status BetaRowset::do_load(bool /*use_cache*/) {
return Status::OK();
}

Status BetaRowset::get_inverted_index_size(size_t* index_size) {
Status BetaRowset::get_inverted_index_size(int64_t* index_size) {
const auto& fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed, resource_id={}",
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class BetaRowset final : public Rowset {

Status get_segments_size(std::vector<size_t>* segments_size);

Status get_inverted_index_size(size_t* index_size);
Status get_inverted_index_size(int64_t* index_size) override;

[[nodiscard]] virtual Status add_to_binlog() override;

Expand Down
23 changes: 21 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,27 @@ Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id));
_num_rows_written += rowset->num_rows();
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
const auto& rowset_meta = rowset->rowset_meta();
auto index_size = rowset_meta->index_disk_size();
auto total_size = rowset_meta->total_disk_size();
auto data_size = rowset_meta->data_disk_size();
// corrupted index size caused by bug before 2.1.5 or 3.0.0 version
// try to get real index size from disk.
if (index_size < 0 || index_size > total_size * 2) {
LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
<< " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
<< " rowset:" << rowset_meta->rowset_id();
index_size = 0;
auto st = rowset->get_inverted_index_size(&index_size);
if (!st.ok()) {
if (!st.is<NOT_FOUND>()) {
LOG(ERROR) << "failed to get inverted index size. res=" << st;
return st;
}
}
}
_total_data_size += data_size;
_total_index_size += index_size;
_num_segment += cast_set<int32_t>(rowset->num_segments());
// append key_bounds to current rowset
RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
// helper class to access RowsetMeta
int64_t start_version() const { return rowset_meta()->version().first; }
int64_t end_version() const { return rowset_meta()->version().second; }
size_t index_disk_size() const { return rowset_meta()->index_disk_size(); }
size_t data_disk_size() const { return rowset_meta()->data_disk_size(); }
size_t total_disk_size() const { return rowset_meta()->total_disk_size(); }
int64_t index_disk_size() const { return rowset_meta()->index_disk_size(); }
int64_t data_disk_size() const { return rowset_meta()->data_disk_size(); }
int64_t total_disk_size() const { return rowset_meta()->total_disk_size(); }
bool empty() const { return rowset_meta()->empty(); }
bool zero_num_rows() const { return rowset_meta()->num_rows() == 0; }
size_t num_rows() const { return rowset_meta()->num_rows(); }
Expand Down Expand Up @@ -210,6 +210,8 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
size_t new_rowset_start_seg_id = 0,
std::set<int64_t>* without_index_uids = nullptr) = 0;

virtual Status get_inverted_index_size(int64_t* index_size) = 0;

// copy all files to `dir`
virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0;

Expand Down
12 changes: 6 additions & 6 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,21 +130,21 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {

void set_num_rows(int64_t num_rows) { _rowset_meta_pb.set_num_rows(num_rows); }

size_t total_disk_size() const { return _rowset_meta_pb.total_disk_size(); }
int64_t total_disk_size() const { return _rowset_meta_pb.total_disk_size(); }

void set_total_disk_size(size_t total_disk_size) {
void set_total_disk_size(int64_t total_disk_size) {
_rowset_meta_pb.set_total_disk_size(total_disk_size);
}

size_t data_disk_size() const { return _rowset_meta_pb.data_disk_size(); }
int64_t data_disk_size() const { return _rowset_meta_pb.data_disk_size(); }

void set_data_disk_size(size_t data_disk_size) {
void set_data_disk_size(int64_t data_disk_size) {
_rowset_meta_pb.set_data_disk_size(data_disk_size);
}

size_t index_disk_size() const { return _rowset_meta_pb.index_disk_size(); }
int64_t index_disk_size() const { return _rowset_meta_pb.index_disk_size(); }

void set_index_disk_size(size_t index_disk_size) {
void set_index_disk_size(int64_t index_disk_size) {
_rowset_meta_pb.set_index_disk_size(index_disk_size);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Status IndexBuilder::update_inverted_index_info() {
TabletSchemaSPtr output_rs_tablet_schema = std::make_shared<TabletSchema>();
const auto& input_rs_tablet_schema = input_rowset->tablet_schema();
output_rs_tablet_schema->copy_from(*input_rs_tablet_schema);
size_t total_index_size = 0;
int64_t total_index_size = 0;
auto* beta_rowset = reinterpret_cast<BetaRowset*>(input_rowset.get());
auto size_st = beta_rowset->get_inverted_index_size(&total_index_size);
DBUG_EXECUTE_IF("IndexBuilder::update_inverted_index_info_size_st_not_ok", {
Expand Down
138 changes: 137 additions & 1 deletion be/test/olap/ordered_data_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ class OrderedDataCompactionTest : public ::testing::Test {
EXPECT_TRUE(io::global_local_filesystem()
->create_directory(absolute_dir + "/tablet_path")
.ok());
// tmp dir
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(tmp_dir).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tmp_dir).ok());
std::vector<StorePath> paths;
paths.emplace_back(std::string(tmp_dir), 1024000000);
auto tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(paths);
st = tmp_file_dirs->init();
EXPECT_TRUE(st.ok()) << st.to_json();
ExecEnv::GetInstance()->set_tmp_file_dir(std::move(tmp_file_dirs));

doris::EngineOptions options;
auto engine = std::make_unique<StorageEngine>(options);
Expand Down Expand Up @@ -153,6 +162,62 @@ class OrderedDataCompactionTest : public ::testing::Test {
return tablet_schema;
}

TabletSchemaSPtr create_inverted_index_v1_schema(KeysType keys_type = DUP_KEYS) {
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
TabletSchemaPB tablet_schema_pb;
tablet_schema_pb.set_keys_type(keys_type);
tablet_schema_pb.set_num_short_key_columns(1);
tablet_schema_pb.set_num_rows_per_row_block(1024);
tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
tablet_schema_pb.set_next_column_unique_id(4);
tablet_schema_pb.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1);

auto* index_pb = tablet_schema_pb.add_index();
index_pb->set_index_id(1);
index_pb->set_index_name("c1_index");
index_pb->set_index_type(IndexType::INVERTED);
index_pb->add_col_unique_id(2);

ColumnPB* column_1 = tablet_schema_pb.add_column();
column_1->set_unique_id(1);
column_1->set_name("c1");
column_1->set_type("INT");
column_1->set_is_key(true);
column_1->set_length(4);
column_1->set_index_length(4);
column_1->set_is_nullable(false);
column_1->set_is_bf_column(false);

ColumnPB* column_2 = tablet_schema_pb.add_column();
column_2->set_unique_id(2);
column_2->set_name("c2");
column_2->set_type("INT");
column_2->set_length(4);
column_2->set_index_length(4);
column_2->set_is_nullable(true);
column_2->set_is_key(false);
column_2->set_is_nullable(false);
column_2->set_is_bf_column(false);

// unique table must contains the DELETE_SIGN column
if (keys_type == UNIQUE_KEYS) {
ColumnPB* column_3 = tablet_schema_pb.add_column();
column_3->set_unique_id(3);
column_3->set_name(DELETE_SIGN);
column_3->set_type("TINYINT");
column_3->set_length(1);
column_3->set_index_length(1);
column_3->set_is_nullable(false);
column_3->set_is_key(false);
column_3->set_is_nullable(false);
column_3->set_is_bf_column(false);
}

tablet_schema->init_from_pb(tablet_schema_pb);

return tablet_schema;
}

TabletSchemaSPtr create_agg_schema() {
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
TabletSchemaPB tablet_schema_pb;
Expand Down Expand Up @@ -401,7 +466,8 @@ class OrderedDataCompactionTest : public ::testing::Test {
}

private:
const std::string kTestDir = "/ut_dir/vertical_compaction_test";
const std::string kTestDir = "/ut_dir/ordered_compaction_test";
const std::string tmp_dir = "./ut_dir/ordered_compaction_test/tmp";
string absolute_dir;
std::unique_ptr<DataDir> _data_dir;
};
Expand Down Expand Up @@ -487,5 +553,75 @@ TEST_F(OrderedDataCompactionTest, test_01) {
}
}

TEST_F(OrderedDataCompactionTest, test_index_disk_size) {
auto num_input_rowset = 3;
auto num_segments = 2;
auto rows_per_segment = 50;
std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data;
generate_input_data(num_input_rowset, num_segments, rows_per_segment, input_data);

TabletSchemaSPtr tablet_schema = create_inverted_index_v1_schema();
TabletSharedPtr tablet = create_tablet(*tablet_schema, false, 10000, false);
EXPECT_TRUE(io::global_local_filesystem()->create_directory(tablet->tablet_path()).ok());

vector<RowsetSharedPtr> input_rowsets;
SegmentsOverlapPB new_overlap = NONOVERLAPPING;
for (auto i = 0; i < num_input_rowset; i++) {
RowsetWriterContext writer_context;
create_rowset_writer_context(tablet_schema, tablet->tablet_path(), new_overlap, UINT32_MAX,
&writer_context);

auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, false);
EXPECT_TRUE(res.has_value()) << res.error();
auto rowset_writer = std::move(res).value();

uint32_t num_rows = 0;
for (int j = 0; j < input_data[i].size(); ++j) {
vectorized::Block block = tablet_schema->create_block();
auto columns = block.mutate_columns();
for (int rid = 0; rid < input_data[i][j].size(); ++rid) {
int32_t c1 = std::get<0>(input_data[i][j][rid]);
int32_t c2 = std::get<1>(input_data[i][j][rid]);
columns[0]->insert_data((const char*)&c1, sizeof(c1));
columns[1]->insert_data((const char*)&c2, sizeof(c2));

if (tablet_schema->keys_type() == UNIQUE_KEYS) {
uint8_t num = 0;
columns[2]->insert_data((const char*)&num, sizeof(num));
}
num_rows++;
}
auto s = rowset_writer->add_block(&block);
EXPECT_TRUE(s.ok());
s = rowset_writer->flush();
EXPECT_TRUE(s.ok());
}

RowsetSharedPtr rowset;
EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
EXPECT_EQ(input_data[i].size(), rowset->rowset_meta()->num_segments());
EXPECT_EQ(num_rows, rowset->rowset_meta()->num_rows());

// Set random index_disk_size
rowset->rowset_meta()->set_index_disk_size(1024000000000000LL);
input_rowsets.push_back(rowset);
}

CumulativeCompaction cu_compaction(*engine_ref, tablet);
cu_compaction._input_rowsets = std::move(input_rowsets);
EXPECT_EQ(cu_compaction.handle_ordered_data_compaction(), true);

auto& out_rowset = cu_compaction._output_rowset;

// Verify the index_disk_size of the output rowset
int64_t expected_total_size = 0;
for (const auto& rowset : cu_compaction._input_rowsets) {
expected_total_size += rowset->rowset_meta()->total_disk_size();
}
std::cout << "expected_total_size: " << expected_total_size << std::endl;
std::cout << "actual_total_disk_size: " << out_rowset->rowset_meta()->total_disk_size()
<< std::endl;
EXPECT_EQ(out_rowset->rowset_meta()->total_disk_size(), expected_total_size);
}
} // namespace vectorized
} // namespace doris
4 changes: 4 additions & 0 deletions be/test/testutil/mock_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class MockRowset : public Rowset {
return Status::NotSupported("MockRowset not support this method.");
}

Status get_inverted_index_size(int64_t* index_size) override {
return Status::NotSupported("MockRowset not support this method.");
}

void clear_inverted_index_cache() override {}

Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) override {
Expand Down
Loading