From 46426cce016f6f5f68b5386893fc27b7e8e712c4 Mon Sep 17 00:00:00 2001 From: cambyzhu Date: Wed, 26 Jun 2024 16:26:06 +0800 Subject: [PATCH 1/2] reduce memory usage for mow table compaction --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 2 ++ be/src/olap/base_tablet.cpp | 8 ++++-- be/src/olap/compaction.cpp | 51 +++++++++++++++++++++---------------- be/src/olap/utils.h | 2 ++ 5 files changed, 41 insertions(+), 24 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 4460e477c8f35e..64c3475e3c1efc 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1120,6 +1120,8 @@ DEFINE_mBool(enable_merge_on_write_correctness_check, "true"); DEFINE_mBool(enable_mow_compaction_correctness_check_core, "false"); // rowid conversion correctness check when compaction for mow table DEFINE_mBool(enable_rowid_conversion_correctness_check, "false"); +// missing rows correctness check when compaction for mow table +DEFINE_mBool(enable_missing_rows_correctness_check, "true"); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20"); diff --git a/be/src/common/config.h b/be/src/common/config.h index dbf1800270403f..5f4fc541be3634 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1185,6 +1185,8 @@ DECLARE_mBool(enable_merge_on_write_correctness_check); DECLARE_mBool(enable_mow_compaction_correctness_check_core); // rowid conversion correctness check when compaction for mow table DECLARE_mBool(enable_rowid_conversion_correctness_check); +// missing rows correctness check when compaction for mow table +DECLARE_mBool(enable_missing_rows_correctness_check); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DECLARE_mInt32(mow_publish_max_discontinuous_version_num); diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 611cce2c869e34..b2a5b5af659f5d 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1339,7 +1339,9 @@ void BaseTablet::calc_compaction_output_rowset_delete_bitmap( << " src loaction: |" << src.rowset_id << "|" << src.segment_id << "|" << src.row_id << " version: " << cur_version; - missed_rows->insert(src); + if (missed_rows) { + missed_rows->insert(src); + } continue; } VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |" @@ -1347,7 +1349,9 @@ void BaseTablet::calc_compaction_output_rowset_delete_bitmap( << " src location: |" << src.rowset_id << "|" << src.segment_id << "|" << src.row_id << " start version: " << start_version << "end version" << end_version; - (*location_map)[rowset].emplace_back(src, dst); + if (location_map) { + (*location_map)[rowset].emplace_back(src, dst); + } output_rowset_delete_bitmap->add({dst.rowset_id, dst.segment_id, cur_version}, dst.row_id); } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 7b00ec1c09d377..30574572e31846 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -881,8 +881,17 @@ Status CompactionMixin::modify_rowsets() { _tablet->tablet_schema()->cluster_key_idxes().empty()) { Version version = tablet()->max_version(); DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); - std::set missed_rows; - std::map>> location_map; + std::unique_ptr missed_rows; + if ((config::enable_missing_rows_correctness_check || + config::enable_mow_compaction_correctness_check_core) && + !_allow_delete_in_cumu_compaction && + compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { + missed_rows = std::make_unique(); + } + std::unique_ptr> location_map; + if (config::enable_rowid_conversion_correctness_check) { + location_map = std::make_unique>(); + } // Convert the delete bitmap of the input rowsets to output rowset. // New loads are not blocked, so some keys of input rowsets might // be deleted during the time. We need to deal with delete bitmap @@ -890,13 +899,12 @@ Status CompactionMixin::modify_rowsets() { // TODO(LiaoXin): check if there are duplicate keys std::size_t missed_rows_size = 0; tablet()->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), + _input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(), + location_map.get(), _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); - if (!_allow_delete_in_cumu_compaction) { - missed_rows_size = missed_rows.size(); - if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION && - _tablet->tablet_state() == TABLET_RUNNING && + if (missed_rows) { + missed_rows_size = missed_rows->size(); + if (_tablet->tablet_state() == TABLET_RUNNING && _stats.merged_rows != missed_rows_size) { std::stringstream ss; ss << "cumulative compaction: the merged rows(" << _stats.merged_rows @@ -930,10 +938,10 @@ Status CompactionMixin::modify_rowsets() { } } - if (config::enable_rowid_conversion_correctness_check) { - RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); + if (location_map) { + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, *location_map)); + location_map->clear(); } - location_map.clear(); { std::lock_guard wrlock_(tablet()->get_rowset_update_lock()); @@ -960,8 +968,8 @@ Status CompactionMixin::modify_rowsets() { } DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id()); tablet()->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, UINT64_MAX, &missed_rows, - &location_map, *it.delete_bitmap.get(), &txn_output_delete_bitmap); + _input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(), + location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap); if (config::enable_merge_on_write_correctness_check) { RowsetIdUnorderedSet rowsetids; rowsetids.insert(_output_rowset->rowset_id()); @@ -980,21 +988,20 @@ Status CompactionMixin::modify_rowsets() { // Convert the delete bitmap of the input rowsets to output rowset for // incremental data. tablet()->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, &missed_rows, - &location_map, _tablet->tablet_meta()->delete_bitmap(), + _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, + missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); - if (!_allow_delete_in_cumu_compaction && - compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { - DCHECK_EQ(missed_rows.size(), missed_rows_size); - if (missed_rows.size() != missed_rows_size) { + if (missed_rows) { + DCHECK_EQ(missed_rows->size(), missed_rows_size); + if (missed_rows->size() != missed_rows_size) { LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size - << " after: " << missed_rows.size(); + << " after: " << missed_rows->size(); } } - if (config::enable_rowid_conversion_correctness_check) { - RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, location_map)); + if (location_map) { + RETURN_IF_ERROR(tablet()->check_rowid_conversion(_output_rowset, *location_map)); } tablet()->merge_delete_bitmap(output_rowset_delete_bitmap); diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h index e23e12a1b942ae..6528d2a86a4e3a 100644 --- a/be/src/olap/utils.h +++ b/be/src/olap/utils.h @@ -296,6 +296,8 @@ struct RowLocation { } } }; +using RowLocationSet = std::set; +using RowLocationPairList = std::list>; struct GlobalRowLoacation { GlobalRowLoacation(int64_t tid, RowsetId rsid, uint32_t sid, uint32_t rid) From a560153f4ccc888149dea10f01bddbc791b7c889 Mon Sep 17 00:00:00 2001 From: cambyzhu Date: Thu, 27 Jun 2024 14:52:02 +0800 Subject: [PATCH 2/2] set default enable_missing_rows_correctness_check to false, and open it in pipeline --- be/src/common/config.cpp | 2 +- be/src/olap/compaction.cpp | 2 ++ regression-test/pipeline/external/conf/be.conf | 1 + regression-test/pipeline/p0/conf/be.conf | 1 + regression-test/pipeline/p1/conf/be.conf | 1 + 5 files changed, 6 insertions(+), 1 deletion(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 64c3475e3c1efc..7ef5399d9b2b3f 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1121,7 +1121,7 @@ DEFINE_mBool(enable_mow_compaction_correctness_check_core, "false"); // rowid conversion correctness check when compaction for mow table DEFINE_mBool(enable_rowid_conversion_correctness_check, "false"); // missing rows correctness check when compaction for mow table -DEFINE_mBool(enable_missing_rows_correctness_check, "true"); +DEFINE_mBool(enable_missing_rows_correctness_check, "false"); // When the number of missing versions is more than this value, do not directly // retry the publish and handle it through async publish. DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20"); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 30574572e31846..37dcac5283ee98 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -887,10 +887,12 @@ Status CompactionMixin::modify_rowsets() { !_allow_delete_in_cumu_compaction && compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { missed_rows = std::make_unique(); + LOG(INFO) << "RowLocation Set inited succ for tablet:" << _tablet->tablet_id(); } std::unique_ptr> location_map; if (config::enable_rowid_conversion_correctness_check) { location_map = std::make_unique>(); + LOG(INFO) << "Location Map inited succ for tablet:" << _tablet->tablet_id(); } // Convert the delete bitmap of the input rowsets to output rowset. // New loads are not blocked, so some keys of input rowsets might diff --git a/regression-test/pipeline/external/conf/be.conf b/regression-test/pipeline/external/conf/be.conf index a7c0713d8eb3ad..94a038cfa885fb 100644 --- a/regression-test/pipeline/external/conf/be.conf +++ b/regression-test/pipeline/external/conf/be.conf @@ -58,6 +58,7 @@ user_files_secure_path=/ enable_debug_points=true # debug scanner context dead loop enable_debug_log_timeout_secs=0 +enable_missing_rows_correctness_check=true trino_connector_plugin_dir=/tmp/trino_connector/connectors diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index d2f4910aa2af24..d5e447e860195e 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -58,6 +58,7 @@ user_files_secure_path=/ enable_debug_points=true # debug scanner context dead loop enable_debug_log_timeout_secs=0 +enable_missing_rows_correctness_check=true trino_connector_plugin_dir=/tmp/trino_connector/connectors diff --git a/regression-test/pipeline/p1/conf/be.conf b/regression-test/pipeline/p1/conf/be.conf index d278b30fb673b5..675518ac0ce464 100644 --- a/regression-test/pipeline/p1/conf/be.conf +++ b/regression-test/pipeline/p1/conf/be.conf @@ -58,6 +58,7 @@ user_files_secure_path=/ enable_debug_points=true # debug scanner context dead loop enable_debug_log_timeout_secs=0 +enable_missing_rows_correctness_check=true enable_jvm_monitor = true