Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,8 @@ DEFINE_mInt64(LZ4_HC_compression_level, "9");
DEFINE_mBool(enable_merge_on_write_correctness_check, "true");
// rowid conversion correctness check when compaction for mow table
DEFINE_mBool(enable_rowid_conversion_correctness_check, "false");
// missing rows correctness check when compaction for mow table
DEFINE_mBool(enable_missing_rows_correctness_check, "false");
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,8 @@ DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
DECLARE_mBool(enable_merge_on_write_correctness_check);
// rowid conversion correctness check when compaction for mow table
DECLARE_mBool(enable_rowid_conversion_correctness_check);
// missing rows correctness check when compaction for mow table
DECLARE_mBool(enable_missing_rows_correctness_check);
// When the number of missing versions is more than this value, do not directly
// retry the publish and handle it through async publish.
DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
Expand Down
49 changes: 28 additions & 21 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,22 +895,30 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
_tablet->tablet_schema()->cluster_key_idxes().empty()) {
Version version = _tablet->max_version();
DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id());
std::set<RowLocation> missed_rows;
std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>> location_map;
std::unique_ptr<RowLocationSet> missed_rows;
if (config::enable_missing_rows_correctness_check && !allow_delete_in_cumu_compaction() &&
compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
missed_rows = std::make_unique<RowLocationSet>();
LOG(INFO) << "RowLocation Set inited succ for tablet:" << _tablet->tablet_id();
}
std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
if (config::enable_rowid_conversion_correctness_check) {
location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
LOG(INFO) << "Location Map inited succ for tablet:" << _tablet->tablet_id();
}
// Convert the delete bitmap of the input rowsets to output rowset.
// New loads are not blocked, so some keys of input rowsets might
// be deleted during the time. We need to deal with delete bitmap
// of incremental data later.
// TODO(LiaoXin): check if there are duplicate keys
std::size_t missed_rows_size = 0;
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows,
&location_map, _tablet->tablet_meta()->delete_bitmap(),
_input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(),
location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);
if (!allow_delete_in_cumu_compaction()) {
missed_rows_size = missed_rows.size();
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION &&
_tablet->tablet_state() == TABLET_RUNNING && stats != nullptr &&
if (missed_rows) {
missed_rows_size = missed_rows->size();
if (_tablet->tablet_state() == TABLET_RUNNING && stats != nullptr &&
stats->merged_rows != missed_rows_size) {
std::stringstream ss;
ss << "cumulative compaction: the merged rows(" << stats->merged_rows
Expand All @@ -936,9 +944,9 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
}

if (config::enable_rowid_conversion_correctness_check) {
RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));
RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, *location_map));
location_map->clear();
}
location_map.clear();

{
std::lock_guard<std::mutex> wrlock_(_tablet->get_rowset_update_lock());
Expand All @@ -965,8 +973,8 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
}
DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id());
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, &missed_rows,
&location_map, *it.delete_bitmap.get(), &txn_output_delete_bitmap);
_input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(),
location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap);
if (config::enable_merge_on_write_correctness_check) {
RowsetIdUnorderedSet rowsetids;
rowsetids.insert(_output_rowset->rowset_id());
Expand All @@ -985,21 +993,20 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) {
// Convert the delete bitmap of the input rowsets to output rowset for
// incremental data.
_tablet->calc_compaction_output_rowset_delete_bitmap(
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX, &missed_rows,
&location_map, _tablet->tablet_meta()->delete_bitmap(),
_input_rowsets, _rowid_conversion, version.second, UINT64_MAX,
missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
&output_rowset_delete_bitmap);

if (!allow_delete_in_cumu_compaction() &&
compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
DCHECK_EQ(missed_rows.size(), missed_rows_size);
if (missed_rows.size() != missed_rows_size) {
if (missed_rows) {
DCHECK_EQ(missed_rows->size(), missed_rows_size);
if (missed_rows->size() != missed_rows_size) {
LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size
<< " after: " << missed_rows.size();
<< " after: " << missed_rows->size();
}
}

if (config::enable_rowid_conversion_correctness_check) {
RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map));
if (location_map) {
RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, *location_map));
}

_tablet->merge_delete_bitmap(output_rowset_delete_bitmap);
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3617,15 +3617,19 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap(
<< " src loaction: |" << src.rowset_id << "|"
<< src.segment_id << "|" << src.row_id
<< " version: " << cur_version;
missed_rows->insert(src);
if (missed_rows) {
missed_rows->insert(src);
}
continue;
}
VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |"
<< dst.rowset_id << "|" << dst.segment_id << "|" << dst.row_id
<< " src location: |" << src.rowset_id << "|" << src.segment_id
<< "|" << src.row_id << " start version: " << start_version
<< "end version" << end_version;
(*location_map)[rowset].emplace_back(src, dst);
if (location_map) {
(*location_map)[rowset].emplace_back(src, dst);
}
output_rowset_delete_bitmap->add({dst.rowset_id, dst.segment_id, cur_version},
dst.row_id);
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ struct RowLocation {
}
}
};
using RowLocationSet = std::set<RowLocation>;
using RowLocationPairList = std::list<std::pair<RowLocation, RowLocation>>;

struct GlobalRowLoacation {
GlobalRowLoacation(int64_t tid, RowsetId rsid, uint32_t sid, uint32_t rid)
Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/external/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ user_files_secure_path=/
enable_debug_points=true
# debug scanner context dead loop
enable_debug_log_timeout_secs=0
enable_missing_rows_correctness_check=true

#enable_jvm_monitor = true

1 change: 1 addition & 0 deletions regression-test/pipeline/p0/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ user_files_secure_path=/
enable_debug_points=true
# debug scanner context dead loop
enable_debug_log_timeout_secs=0
enable_missing_rows_correctness_check=true

#enable_jvm_monitor = true

1 change: 1 addition & 0 deletions regression-test/pipeline/p1/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ enable_fuzzy_mode=true
enable_set_in_bitmap_value=true
enable_feature_binlog=true
max_sys_mem_available_low_water_mark_bytes=69206016
enable_missing_rows_correctness_check=true

enable_jvm_monitor = true