diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 4460e477c8f35e..7ef5399d9b2b3f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1120,6 +1120,8 @@ DEFINE_mBool(enable_merge_on_write_correctness_check, "true"); DEFINE_mBool(enable_mow_compaction_correctness_check_core, "false"); // rowid conversion correctness check when compaction for mow table DEFINE_mBool(enable_rowid_conversion_correctness_check, "false"); +// missing rows correctness check when compaction for mow table +DEFINE_mBool(enable_missing_rows_correctness_check, "false"); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20"); diff --git a/be/src/common/config.h b/be/src/common/config.h index dbf1800270403f..5f4fc541be3634 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1185,6 +1185,8 @@ DECLARE_mBool(enable_merge_on_write_correctness_check); DECLARE_mBool(enable_mow_compaction_correctness_check_core); // rowid conversion correctness check when compaction for mow table DECLARE_mBool(enable_rowid_conversion_correctness_check); +// missing rows correctness check when compaction for mow table +DECLARE_mBool(enable_missing_rows_correctness_check); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DECLARE_mInt32(mow_publish_max_discontinuous_version_num); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 611cce2c869e34..b2a5b5af659f5d 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1339,7 +1339,9 @@ void BaseTablet::calc_compaction_output_rowset_delete_bitmap( << " src loaction: |" << src.rowset_id << "|" << src.segment_id << "|" << src.row_id << " version: " << cur_version; - missed_rows->insert(src); + if (missed_rows) { + missed_rows->insert(src); + } continue; } VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |" @@ -1347,7 +1349,9 @@ void BaseTablet::calc_compaction_output_rowset_delete_bitmap( << " src location: |" << src.rowset_id << "|" << src.segment_id << "|" << src.row_id << " start version: " << start_version << "end version" << end_version; - (*location_map)[rowset].emplace_back(src, dst); + if (location_map) { + (*location_map)[rowset].emplace_back(src, dst); + } output_rowset_delete_bitmap->add({dst.rowset_id, dst.segment_id, cur_version}, dst.row_id); } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 7b00ec1c09d377..37dcac5283ee98 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -881,8 +881,19 @@ Status CompactionMixin::modify_rowsets() { _tablet->tablet_schema()->cluster_key_idxes().empty()) { Version version = tablet()->max_version(); DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); - std::set missed_rows; - std::map>> location_map; + std::unique_ptr missed_rows; + if ((config::enable_missing_rows_correctness_check || + config::enable_mow_compaction_correctness_check_core) && + !_allow_delete_in_cumu_compaction && + compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { + missed_rows = std::make_unique(); + LOG(INFO) << "RowLocation Set inited succ for tablet:" << _tablet->tablet_id(); + } + std::unique_ptr> location_map; + if (config::enable_rowid_conversion_correctness_check) { + location_map = std::make_unique>(); + LOG(INFO) << "Location Map inited succ for tablet:" << _tablet->tablet_id(); + } // Convert the delete bitmap of the input rowsets to output rowset. // New loads are not blocked, so some keys of input rowsets might // be deleted during the time. We need to deal with delete bitmap @@ -890,13 +901,12 @@ Status CompactionMixin::modify_rowsets() { // TODO(LiaoXin): check if there are duplicate keys std::size_t missed_rows_size = 0; tablet()->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), + _input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(), + location_map.get(), _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); - if (!_allow_delete_in_cumu_compaction) { - missed_rows_size = missed_rows.size(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && - _tablet->tablet_state() == TABLET_RUNNING && + if (missed_rows) { + missed_rows_size = missed_rows->size(); + if (_tablet->tablet_state() == TABLET_RUNNING && _stats.merged_rows != missed_rows_size) { std::stringstream ss; ss << "cumulative compaction: the merged rows(" << _stats.merged_rows @@ -930,10 +940,10 @@ Status CompactionMixin::modify_rowsets() { } } - if (config::enable_rowid_conversion_correctness_check) { - RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); + if (location_map) { + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, *location_map)); + location_map->clear(); } - location_map.clear(); { std::lock_guard wrlock_(tablet()->get_rowset_update_lock()); @@ -960,8 +970,8 @@ Status CompactionMixin::modify_rowsets() { } DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id()); tablet()->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, UINT64_MAX, &missed_rows, - &location_map, *it.delete_bitmap.get(), &txn_output_delete_bitmap); + _input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(), + location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap); if (config::enable_merge_on_write_correctness_check) { RowsetIdUnorderedSet rowsetids; rowsetids.insert(_output_rowset->rowset_id()); @@ -980,21 +990,20 @@ Status CompactionMixin::modify_rowsets() { // Convert the delete bitmap of the input rowsets to output rowset for // incremental data. tablet()->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), + _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, + missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); - if (!_allow_delete_in_cumu_compaction && - compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - DCHECK_EQ(missed_rows.size(), missed_rows_size); - if (missed_rows.size() != missed_rows_size) { + if (missed_rows) { + DCHECK_EQ(missed_rows->size(), missed_rows_size); + if (missed_rows->size() != missed_rows_size) { LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size - << " after: " << missed_rows.size(); + << " after: " << missed_rows->size(); } } - if (config::enable_rowid_conversion_correctness_check) { - RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); + if (location_map) { + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, *location_map)); } tablet()->merge_delete_bitmap(output_rowset_delete_bitmap); diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index e23e12a1b942ae..6528d2a86a4e3a 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -296,6 +296,8 @@ struct RowLocation { } } }; +using RowLocationSet = std::set; +using RowLocationPairList = std::list>; struct GlobalRowLoacation { GlobalRowLoacation(int64_t tid, RowsetId rsid, uint32_t sid, uint32_t rid) diff --git a/regression-test/pipeline/external/conf/be.conf b/regression-test/pipeline/external/conf/be.conf index a7c0713d8eb3ad..94a038cfa885fb 100644 --- a/regression-test/pipeline/external/conf/be.conf +++ b/regression-test/pipeline/external/conf/be.conf @@ -58,6 +58,7 @@ user_files_secure_path=/ enable_debug_points=true # debug scanner context dead loop enable_debug_log_timeout_secs=0 +enable_missing_rows_correctness_check=true trino_connector_plugin_dir=/tmp/trino_connector/connectors diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index d2f4910aa2af24..d5e447e860195e 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -58,6 +58,7 @@ user_files_secure_path=/ enable_debug_points=true # debug scanner context dead loop enable_debug_log_timeout_secs=0 +enable_missing_rows_correctness_check=true trino_connector_plugin_dir=/tmp/trino_connector/connectors diff --git a/regression-test/pipeline/p1/conf/be.conf b/regression-test/pipeline/p1/conf/be.conf index d278b30fb673b5..675518ac0ce464 100644 --- a/regression-test/pipeline/p1/conf/be.conf +++ b/regression-test/pipeline/p1/conf/be.conf @@ -58,6 +58,7 @@ user_files_secure_path=/ enable_debug_points=true # debug scanner context dead loop enable_debug_log_timeout_secs=0 +enable_missing_rows_correctness_check=true enable_jvm_monitor = true