From 00b5f8af84c4563e66b0d8a6b6573d961cdca17b Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Sun, 2 Jun 2024 01:42:42 +0800 Subject: [PATCH 1/3] 1 --- .../segment_v2/vertical_segment_writer.cpp | 74 ++++++++++++++++++- .../segment_v2/vertical_segment_writer.h | 3 + 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 448a2b7304c5fa..e970613b32036f 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -32,11 +32,13 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" // LOG +#include "common/status.h" #include "gutil/port.h" #include "inverted_index_fs_directory.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "olap/data_dir.h" +#include "olap/delete_handler.h" #include "olap/key_coder.h" #include "olap/olap_common.h" #include "olap/partial_update_info.h" @@ -384,8 +386,25 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da } } std::vector> segment_caches(specified_rowsets.size()); - // locate rows in base data + // init delete predicates for all rowsets + std::vector delete_predicates; + int64_t max_rowset_version = 0; + for (const auto& rs : specified_rowsets) { + if (rs->rowset_meta()->get_rowset_pb().has_delete_predicate()) { + _has_delete_predicate = true; + delete_predicates.push_back(rs->rowset_meta()); + max_rowset_version = max_rowset_version >= rs->version().second ? max_rowset_version + : rs->version().second; + } + } + if (_has_delete_predicate) { + _delete_handler = DeleteHandler::create_unique(); + RETURN_IF_ERROR(_delete_handler->init(_tablet_schema, delete_predicates, max_rowset_version, + false)); + } + + // locate rows in base data int64_t num_rows_filtered = 0; for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) { // block segment @@ -558,6 +577,7 @@ Status VerticalSegmentWriter::_fill_missing_columns( std::map read_index; size_t read_idx = 0; for (auto rs_it : _rssid_to_rid) { + size_t start_row_num = old_value_block.rows(); for (auto seg_it : rs_it.second) { auto rowset = _rsid_to_rowset[rs_it.first]; CHECK(rowset); @@ -586,6 +606,55 @@ Status VerticalSegmentWriter::_fill_missing_columns( } } } + if (_has_delete_predicate) { + // Get and parse the delete predicate from each rowset. + // Use 'vector _delete_predicate_rows' to record rows deleted by delete predicates. + // Here's an example. We perform 3 operations. The table schema is: int k1, varchar v1 (nullable). + // 1. Insert into table values(1,'a'); + // 2. Insert into table values(2,'b'); + // 3. Delete from table where v1='b'; + // Now, the rowsets are as follows: + // rowset1 ID:1 Version:[1-1] Type:DATA contains (1,'a') + // rowset2 ID:2 Version:[2-2] Type:DATA contains (2,'b') + // rowset3 ID:3 Version:[3-3] Type:DELETE with the delete predicate: v1='b' + // For a partial update, we first read all old data. Then we compare the new data with the old data. + // If the key of the new data exists, we update only the specified columns of the new data. + // If the key does not exist, we insert the new data. + // In the example above, if we perform a partial update: 4. Insert into table (k1) values(2); + // We must consider the delete predicate, which indicates that data where k1=2 does not exist (deleted by step3). + // Therefore, after reading all rowsets, our data block shows (1,'a')(2,'b'). + // Given the delete predicate v1='b', we mark data (2,'b') as deleted. + // _delete_predicate_rows will be [False, True]. 'False' means (1,'a') exists. + // 'True' means (2,'b') does not exist. For the new partial update, + // we understand that key '2' does not exist, so we ignore the old value and write (2, NULL) into the table + // because the nullable column v1 will automatically be filled with 'NULL'. + // Question: Why not just delete (2,'b') in the data block? + // Answer: For historical reasons. Before reading the data block, we identify the number of rows needed + // based on the primary key index. Some preparatory work depends on this row count. + // Therefore, if we delete (2,'b') directly in the block, some checks, such as + // CHECK(row_num_need_to_read = block.rows()), will fail. + + std::unique_ptr delete_pre_block = + vectorized::MutableBlock::create_unique(old_value_block.clone_empty()); + RETURN_IF_ERROR(delete_pre_block->add_rows(&old_value_block, start_row_num - 1, + old_value_block.rows() - start_row_num)); + std::shared_ptr delete_condition_predicates = + AndBlockColumnPredicate::create_shared(); + std::unordered_map> + del_predicates_for_zone_map; + _delete_handler->get_delete_conditions_after_version( + _rsid_to_rowset[rs_it.first]->end_version(), delete_condition_predicates.get(), + &del_predicates_for_zone_map); + std::vector select_rowid_idx_vec; + uint16_t selected_size = 0; + selected_size = delete_condition_predicates->evaluate( + delete_pre_block->mutable_columns(), select_rowid_idx_vec.data(), + selected_size); + _delete_predicate_rows.resize(old_value_block.rows()); + for (size_t i = 0; i < selected_size; i++) { + _delete_predicate_rows[start_row_num + select_rowid_idx_vec[i] - 1] = true; + } + } } // build default value columns auto default_value_block = old_value_block.clone_empty(); @@ -645,7 +714,8 @@ Status VerticalSegmentWriter::_fill_missing_columns( // to check if a row REALLY exists in the table. if (use_default_or_null_flag[idx] || (delete_sign_column_data != nullptr && - delete_sign_column_data[read_index[idx + segment_start_pos]] != 0)) { + delete_sign_column_data[read_index[idx + segment_start_pos]] != 0) || + (_has_delete_predicate && _delete_predicate_rows[idx])) { for (auto i = 0; i < missing_cids.size(); ++i) { // if the column has default value, fill it with default value // otherwise, if the column is nullable, fill it with null value diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 6c853aa185f430..0c12dbebd84d45 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -195,6 +195,9 @@ class VerticalSegmentWriter { std::map _rsid_to_rowset; std::vector _batched_blocks; + bool _has_delete_predicate = false; + std::unique_ptr _delete_handler = nullptr; + std::vector _delete_predicate_rows; }; } // namespace segment_v2 From e5cf1344304f356ead757930289f6f2cf3c15c30 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 3 Jun 2024 22:11:16 +0800 Subject: [PATCH 2/3] 2 --- .../segment_v2/vertical_segment_writer.cpp | 49 ++++++++++++++++--- .../segment_v2/vertical_segment_writer.h | 1 + 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index e970613b32036f..937864040e36ec 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -402,6 +402,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da _delete_handler = DeleteHandler::create_unique(); RETURN_IF_ERROR(_delete_handler->init(_tablet_schema, delete_predicates, max_rowset_version, false)); + _all_cids.resize(_tablet_schema->num_columns()); + std::iota(_all_cids.begin(), _all_cids.end(), 0); } // locate rows in base data @@ -578,6 +580,19 @@ Status VerticalSegmentWriter::_fill_missing_columns( size_t read_idx = 0; for (auto rs_it : _rssid_to_rid) { size_t start_row_num = old_value_block.rows(); + // todo: clear block every loop + auto full_schema_predicate_block = _tablet_schema->create_block_by_cids(_all_cids); + vectorized::MutableColumns mutable_full_schema_predicate_columns = + full_schema_predicate_block.mutate_columns(); + mutable_full_schema_predicate_columns.resize(_tablet_schema->num_columns()); + for (int cid = 0; cid < _tablet_schema->num_columns(); cid++) { + RETURN_IF_CATCH_EXCEPTION( + mutable_full_schema_predicate_columns[cid] = Schema::get_predicate_column_ptr( + _tablet_schema->column(cid).type() == FieldType::OLAP_FIELD_TYPE_CHAR + ? FieldType::OLAP_FIELD_TYPE_CHAR + : _tablet_schema->column(cid).type(), + _tablet_schema->column(cid).is_nullable(), ReaderType::UNKNOWN)); + } for (auto seg_it : rs_it.second) { auto rowset = _rsid_to_rowset[rs_it.first]; CHECK(rowset); @@ -593,6 +608,15 @@ Status VerticalSegmentWriter::_fill_missing_columns( LOG(WARNING) << "failed to fetch value through row column"; return st; } + if (_has_delete_predicate) { + auto st = tablet->fetch_value_through_row_column(rowset, *_tablet_schema, + seg_it.first, rids, _all_cids, + full_schema_predicate_block); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch full schema value through row column"; + return st; + } + } continue; } for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { @@ -605,6 +629,19 @@ Status VerticalSegmentWriter::_fill_missing_columns( return st; } } + if (_has_delete_predicate) { + for (size_t cid = 0; cid < mutable_full_schema_predicate_columns.size(); ++cid) { + TabletColumn tablet_column = _tablet_schema->column(_all_cids[cid]); + auto st = tablet->fetch_value_by_rowids( + rowset, seg_it.first, rids, tablet_column, + mutable_full_schema_predicate_columns[cid]); + // set read value to output block + if (!st.ok()) { + LOG(WARNING) << "failed to fetch full schema value by rowids"; + return st; + } + } + } } if (_has_delete_predicate) { // Get and parse the delete predicate from each rowset. @@ -634,10 +671,6 @@ Status VerticalSegmentWriter::_fill_missing_columns( // Therefore, if we delete (2,'b') directly in the block, some checks, such as // CHECK(row_num_need_to_read = block.rows()), will fail. - std::unique_ptr delete_pre_block = - vectorized::MutableBlock::create_unique(old_value_block.clone_empty()); - RETURN_IF_ERROR(delete_pre_block->add_rows(&old_value_block, start_row_num - 1, - old_value_block.rows() - start_row_num)); std::shared_ptr delete_condition_predicates = AndBlockColumnPredicate::create_shared(); std::unordered_map> @@ -648,11 +681,11 @@ Status VerticalSegmentWriter::_fill_missing_columns( std::vector select_rowid_idx_vec; uint16_t selected_size = 0; selected_size = delete_condition_predicates->evaluate( - delete_pre_block->mutable_columns(), select_rowid_idx_vec.data(), + mutable_full_schema_predicate_columns, select_rowid_idx_vec.data(), selected_size); - _delete_predicate_rows.resize(old_value_block.rows()); + _delete_predicate_rows.resize(old_value_block.rows(), true); for (size_t i = 0; i < selected_size; i++) { - _delete_predicate_rows[start_row_num + select_rowid_idx_vec[i] - 1] = true; + _delete_predicate_rows[start_row_num + select_rowid_idx_vec[i] - 1] = false; } } } @@ -669,7 +702,7 @@ Status VerticalSegmentWriter::_fill_missing_columns( delete_sign_column_data = delete_sign_col.get_data().data(); } - if (has_default_or_nullable || delete_sign_column_data != nullptr) { + if (has_default_or_nullable || delete_sign_column_data != nullptr || _has_delete_predicate) { for (auto i = 0; i < missing_cids.size(); ++i) { const auto& column = _tablet_schema->column(missing_cids[i]); if (column.has_default_value()) { diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 0c12dbebd84d45..d208d741a969a6 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -198,6 +198,7 @@ class VerticalSegmentWriter { bool _has_delete_predicate = false; std::unique_ptr _delete_handler = nullptr; std::vector _delete_predicate_rows; + std::vector _all_cids; }; } // namespace segment_v2 From b67e7774a43ecd8416b728c5d348ca0ce808e0b5 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 3 Jun 2024 23:14:22 +0800 Subject: [PATCH 3/3] 3 --- .../test_partial_update_delete.out | 28 +++++++++++++++++++ .../test_partial_update_delete.groovy | 24 ++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out index 0863afd7931780..53d077bbebb1cc 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_delete.out @@ -41,6 +41,20 @@ 4 4 4 4 4 0 5 5 5 5 5 0 +-- !sql -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 + +-- !sql -- +1 1 \N +2 2 \N +3 3 \N +4 4 4 +5 5 5 + -- !sql -- 1 1 1 1 1 2 2 2 2 2 @@ -83,3 +97,17 @@ 4 4 4 4 4 0 5 5 5 5 5 0 +-- !sql -- +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 + +-- !sql -- +1 1 \N +2 2 \N +3 3 \N +4 4 4 +5 5 5 + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy index 38720646b2b533..adfc1b5fc3c122 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_delete.groovy @@ -106,6 +106,30 @@ suite('test_partial_update_delete') { sql "set skip_delete_bitmap=true;" qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;" sql "drop table if exists ${tableName3};" + + sql "set skip_delete_sign=false;" + sql "set skip_storage_engine_merge=false;" + sql "set skip_delete_bitmap=false;" + def tableName4 = "test_partial_update_delete4" + sql "DROP TABLE IF EXISTS ${tableName4};" + sql """ CREATE TABLE IF NOT EXISTS ${tableName4} ( + `k1` int, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1", + "store_row_column" = "${use_row_store}"); """ + sql "insert into ${tableName4} values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5);" + qt_sql "select * from ${tableName4} order by k1;" + sql "delete from ${tableName4} where k1=1;" + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + sql "insert into ${tableName4} (k1, c1) values (1,1),(2,2),(3,3);" + qt_sql "select * from ${tableName4} order by k1;" } } }