Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,8 @@ DEFINE_mBool(enable_merge_on_write_correctness_check, "true");
DEFINE_mBool(enable_mow_compaction_correctness_check_core, "false");
// rowid conversion correctness check when compaction for mow table
DEFINE_mBool(enable_rowid_conversion_correctness_check, "false");
// missing rows correctness check when compaction for mow table
DEFINE_mBool(enable_missing_rows_correctness_check, "false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,8 @@ DECLARE_mBool(enable_merge_on_write_correctness_check);
DECLARE_mBool(enable_mow_compaction_correctness_check_core);
// rowid conversion correctness check when compaction for mow table
DECLARE_mBool(enable_rowid_conversion_correctness_check);
// missing rows correctness check when compaction for mow table
DECLARE_mBool(enable_missing_rows_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1339,15 +1339,19 @@ void BaseTablet::calc_compaction_output_rowset_delete_bitmap(
<< " src loaction: |" << src.rowset_id << "|"
<< src.segment_id << "|" << src.row_id
<< " version: " << cur_version;
missed_rows->insert(src);
if (missed_rows) {
missed_rows->insert(src);
}
continue;
}
VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |"
<< dst.rowset_id << "|" << dst.segment_id << "|" << dst.row_id
<< " src location: |" << src.rowset_id << "|" << src.segment_id
<< "|" << src.row_id << " start version: " << start_version
<< "end version" << end_version;
(*location_map)[rowset].emplace_back(src, dst);
if (location_map) {
(*location_map)[rowset].emplace_back(src, dst);
}
output_rowset_delete_bitmap->add({dst.rowset_id, dst.segment_id, cur_version},
dst.row_id);
}
Expand Down
53 changes: 31 additions & 22 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,22 +881,32 @@ Status CompactionMixin::modify_rowsets() {
_tablet->tablet_schema()->cluster_key_idxes().empty()) {
Version version = tablet()->max_version();
DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::set<RowLocation> missed_rows;
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
std::unique_ptr<RowLocationSet> missed_rows;
if ((config::enable_missing_rows_correctness_check ||
config::enable_mow_compaction_correctness_check_core) &&
!_allow_delete_in_cumu_compaction &&
compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
missed_rows = std::make_unique<RowLocationSet>();
LOG(INFO) << "RowLocation Set inited succ for tablet:" << _tablet->tablet_id();
}
std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
if (config::enable_rowid_conversion_correctness_check) {
location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
LOG(INFO) << "Location Map inited succ for tablet:" << _tablet->tablet_id();
}
// Convert the delete bitmap of the input rowsets to output rowset.
// New loads are not blocked, so some keys of input rowsets might
// be deleted during the time. We need to deal with delete bitmap
// of incremental data later.
// TODO(LiaoXin): check if there are duplicate keys
std::size_t missed_rows_size = 0;
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows,
&location_map, _tablet->tablet_meta()->delete_bitmap(),
_input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(),
location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (!_allow_delete_in_cumu_compaction) {
missed_rows_size = missed_rows.size();
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->tablet_state() == TABLET_RUNNING &&
if (missed_rows) {
missed_rows_size = missed_rows->size();
if (_tablet->tablet_state() == TABLET_RUNNING &&
_stats.merged_rows != missed_rows_size) {
std::stringstream ss;
ss << "cumulative compaction: the merged rows(" << _stats.merged_rows
Expand Down Expand Up @@ -930,10 +940,10 @@ Status CompactionMixin::modify_rowsets() {
}
}

if (config::enable_rowid_conversion_correctness_check) {
RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map));
if (location_map) {
RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, *location_map));
location_map->clear();
}
location_map.clear();

{
std::lock_guard<std::mutex> wrlock_(tablet()->get_rowset_update_lock());
Expand All @@ -960,8 +970,8 @@ Status CompactionMixin::modify_rowsets() {
}
DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id());
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, &missed_rows,
&location_map, *it.delete_bitmap.get(), &txn_output_delete_bitmap);
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap);
if (config::enable_merge_on_write_correctness_check) {
RowsetIdUnorderedSet rowsetids;
rowsetids.insert(_output_rowset->rowset_id());
Expand All @@ -980,21 +990,20 @@ Status CompactionMixin::modify_rowsets() {
// Convert the delete bitmap of the input rowsets to output rowset for
// incremental data.
tablet()->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX, &missed_rows,
&location_map, _tablet->tablet_meta()->delete_bitmap(),
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX,
missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);

if (!_allow_delete_in_cumu_compaction &&
compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
DCHECK_EQ(missed_rows.size(), missed_rows_size);
if (missed_rows.size() != missed_rows_size) {
if (missed_rows) {
DCHECK_EQ(missed_rows->size(), missed_rows_size);
if (missed_rows->size() != missed_rows_size) {
LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size
<< " after: " << missed_rows.size();
<< " after: " << missed_rows->size();
}
}

if (config::enable_rowid_conversion_correctness_check) {
RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map));
if (location_map) {
RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, *location_map));
}

tablet()->merge_delete_bitmap(output_rowset_delete_bitmap);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ struct RowLocation {
}
}
};
using RowLocationSet = std::set<RowLocation>;
using RowLocationPairList = std::list<std::pair<RowLocation, RowLocation>>;

struct GlobalRowLoacation {
GlobalRowLoacation(int64_t tid, RowsetId rsid, uint32_t sid, uint32_t rid)
Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/external/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ user_files_secure_path=/
enable_debug_points=true
# debug scanner context dead loop
enable_debug_log_timeout_secs=0
enable_missing_rows_correctness_check=true

trino_connector_plugin_dir=/tmp/trino_connector/connectors

Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/p0/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ user_files_secure_path=/
enable_debug_points=true
# debug scanner context dead loop
enable_debug_log_timeout_secs=0
enable_missing_rows_correctness_check=true

trino_connector_plugin_dir=/tmp/trino_connector/connectors

Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/p1/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ user_files_secure_path=/
enable_debug_points=true
# debug scanner context dead loop
enable_debug_log_timeout_secs=0
enable_missing_rows_correctness_check=true

enable_jvm_monitor = true