Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir)
_data_dir(data_dir) {
_gen_tablet_path();

std::stringstream ss;
ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "."
<< _tablet_meta->tablet_uid().to_string();
_full_name = ss.str();

_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
strings::Substitute("Tablet.$0", tablet_id()),
{{"tablet_id", std::to_string(tablet_id())}},
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class BaseTablet : public std::enable_shared_from_this<BaseTablet> {

// metrics of this tablet
std::shared_ptr<MetricEntity> _metric_entity = nullptr;

std::string _full_name;
public:
IntCounter* query_scan_bytes;
IntCounter* query_scan_rows;
Expand Down Expand Up @@ -110,10 +112,7 @@ inline int64_t BaseTablet::table_id() const {
}

inline const std::string BaseTablet::full_name() const {
std::stringstream ss;
ss << _tablet_meta->tablet_id() << "." << _tablet_meta->schema_hash() << "."
<< _tablet_meta->tablet_uid().to_string();
return ss.str();
return _full_name;
}

inline int64_t BaseTablet::partition_id() const {
Expand Down
27 changes: 27 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,33 @@ OLAPStatus Compaction::gc_unused_rowsets() {
return OLAP_SUCCESS;
}

// Find the longest consecutive version path in "rowset", from begining.
// Two versions before and after the missing version will be saved in missing_version,
// if missing_version is not null.
OLAPStatus Compaction::find_longest_consecutive_version(
vector<RowsetSharedPtr>* rowsets,
vector<Version>* missing_version) {
if (rowsets->empty()) {
return OLAP_SUCCESS;
}
RowsetSharedPtr prev_rowset = rowsets->front();
size_t i = 1;
for (; i < rowsets->size(); ++i) {
RowsetSharedPtr rowset = (*rowsets)[i];
if (rowset->start_version() != prev_rowset->end_version() + 1) {
if (missing_version != nullptr) {
missing_version->push_back(prev_rowset->version());
missing_version->push_back(rowset->version());
}
break;
}
prev_rowset = rowset;
}

rowsets->resize(i);
return OLAP_SUCCESS;
}

OLAPStatus Compaction::check_version_continuity(const vector<RowsetSharedPtr>& rowsets) {
RowsetSharedPtr prev_rowset = rowsets.front();
for (size_t i = 1; i < rowsets.size(); ++i) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Compaction {

OLAPStatus check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
OLAPStatus check_correctness(const Merger::Statistics& stats);
OLAPStatus find_longest_consecutive_version(std::vector<RowsetSharedPtr>* rowsets, std::vector<Version>* missing_version);

// semaphore used to limit the concurrency of running compaction tasks
static Semaphore _concurrency_sem;
Expand Down
14 changes: 12 additions & 2 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,18 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSIONS;
}

RETURN_NOT_OK(check_version_continuity(candidate_rowsets));
// candidate_rowsets may not be continuous. Because some rowset may not be selected
// because the protection time has not expired(config::cumulative_compaction_skip_window_seconds).
// So we need to choose the longest continuous path from it.
std::vector<Version> missing_versions;
RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions));
if (!missing_versions.empty()) {
DCHECK(missing_versions.size() == 2);
LOG(WARNING) << "There are missed versions among rowsets. "
<< "prev rowset verison=" << missing_versions[0]
<< ", next rowset version=" << missing_versions[1]
<< ", tablet=" << _tablet->full_name();
}

size_t compaction_score = 0;
int transient_size = _tablet->cumulative_compaction_policy()->pick_input_rowsets(
Expand Down Expand Up @@ -149,4 +160,3 @@ OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
}

} // namespace doris

3 changes: 1 addition & 2 deletions be/src/olap/cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ void CumulativeCompactionPolicy::pick_candicate_rowsets(int64_t skip_window_sec,
}
}
std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator);

}

std::unique_ptr<CumulativeCompactionPolicy> CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(std::string type) {
Expand Down Expand Up @@ -467,4 +466,4 @@ void CumulativeCompactionPolicyFactory::_parse_cumulative_compaction_policy(std:
*policy_type = NUM_BASED_POLICY;
}
}
}
}
57 changes: 56 additions & 1 deletion be/test/olap/cumulative_compaction_policy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "olap/tablet_meta.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/cumulative_compaction.h"
#include "olap/cumulative_compaction_policy.h"

namespace doris {
Expand Down Expand Up @@ -638,6 +639,24 @@ class TestSizeBasedCumulativeCompactionPolicy : public testing::Test {
rs_metas->push_back(ptr5);
}

void init_rs_meta_missing_version(std::vector<RowsetMetaSharedPtr>* rs_metas) {
RowsetMetaSharedPtr ptr1(new RowsetMeta());
init_rs_meta(ptr1, 0, 0);
rs_metas->push_back(ptr1);

RowsetMetaSharedPtr ptr2(new RowsetMeta());
init_rs_meta(ptr2, 1, 1);
rs_metas->push_back(ptr2);

RowsetMetaSharedPtr ptr3(new RowsetMeta());
init_rs_meta(ptr3, 2, 2);
rs_metas->push_back(ptr3);

RowsetMetaSharedPtr ptr5(new RowsetMeta());
init_rs_meta(ptr5, 4, 4);
rs_metas->push_back(ptr5);
}

protected:
std::string _json_rowset_meta;
TabletMetaSharedPtr _tablet_meta;
Expand Down Expand Up @@ -1013,10 +1032,46 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) {
ASSERT_EQ(134217728, policy->_levels[2]);
ASSERT_EQ(67108864, policy->_levels[3]);
}

TEST_F(TestSizeBasedCumulativeCompactionPolicy, _pick_missing_version_cumulative_compaction) {
std::vector<RowsetMetaSharedPtr> rs_metas;
init_rs_meta_missing_version(&rs_metas);

for (auto &rowset : rs_metas) {
_tablet_meta->add_rs_meta(rowset);
}

TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY));
_tablet->init();

// has miss version
std::vector<RowsetSharedPtr> rowsets;
rowsets.push_back(_tablet->get_rowset_by_version({0, 0}));
rowsets.push_back(_tablet->get_rowset_by_version({1, 1}));
rowsets.push_back(_tablet->get_rowset_by_version({2, 2}));
rowsets.push_back(_tablet->get_rowset_by_version({4, 4}));
std::shared_ptr<MemTracker> mem_tracker(new MemTracker());
CumulativeCompaction compaction(_tablet, "label", mem_tracker);
compaction.find_longest_consecutive_version(&rowsets);
ASSERT_EQ(3, rowsets.size());
ASSERT_EQ(2, rowsets[2]->end_version());

// no miss version
std::vector<RowsetSharedPtr> rowsets2;
rowsets2.push_back(_tablet->get_rowset_by_version({0, 0}));
compaction.find_longest_consecutive_version(&rowsets2);
ASSERT_EQ(1, rowsets2.size());
ASSERT_EQ(0, rowsets[0]->end_version());

// no version
std::vector<RowsetSharedPtr> rowsets3;
compaction.find_longest_consecutive_version(&rowsets3);
ASSERT_EQ(0, rowsets3.size());
}
}

// @brief Test Stub
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}