Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 106 additions & 3 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h" // LOG
#include "common/status.h"
#include "gutil/port.h"
#include "inverted_index_fs_directory.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/delete_handler.h"
#include "olap/key_coder.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
Expand Down Expand Up @@ -384,8 +386,27 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data

// init delete predicates for all rowsets
std::vector<RowsetMetaSharedPtr> delete_predicates;
int64_t max_rowset_version = 0;
for (const auto& rs : specified_rowsets) {
if (rs->rowset_meta()->get_rowset_pb().has_delete_predicate()) {
_has_delete_predicate = true;
delete_predicates.push_back(rs->rowset_meta());
max_rowset_version = max_rowset_version >= rs->version().second ? max_rowset_version
: rs->version().second;
}
}
if (_has_delete_predicate) {
_delete_handler = DeleteHandler::create_unique();
RETURN_IF_ERROR(_delete_handler->init(_tablet_schema, delete_predicates, max_rowset_version,
false));
_all_cids.resize(_tablet_schema->num_columns());
std::iota(_all_cids.begin(), _all_cids.end(), 0);
}

// locate rows in base data
int64_t num_rows_filtered = 0;
for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) {
// block segment
Expand Down Expand Up @@ -558,6 +579,20 @@ Status VerticalSegmentWriter::_fill_missing_columns(
std::map<uint32_t, uint32_t> read_index;
size_t read_idx = 0;
for (auto rs_it : _rssid_to_rid) {
size_t start_row_num = old_value_block.rows();
// todo: clear block every loop
auto full_schema_predicate_block = _tablet_schema->create_block_by_cids(_all_cids);
vectorized::MutableColumns mutable_full_schema_predicate_columns =
full_schema_predicate_block.mutate_columns();
mutable_full_schema_predicate_columns.resize(_tablet_schema->num_columns());
for (int cid = 0; cid < _tablet_schema->num_columns(); cid++) {
RETURN_IF_CATCH_EXCEPTION(
mutable_full_schema_predicate_columns[cid] = Schema::get_predicate_column_ptr(
_tablet_schema->column(cid).type() == FieldType::OLAP_FIELD_TYPE_CHAR
? FieldType::OLAP_FIELD_TYPE_CHAR
: _tablet_schema->column(cid).type(),
_tablet_schema->column(cid).is_nullable(), ReaderType::UNKNOWN));
}
for (auto seg_it : rs_it.second) {
auto rowset = _rsid_to_rowset[rs_it.first];
CHECK(rowset);
Expand All @@ -573,6 +608,15 @@ Status VerticalSegmentWriter::_fill_missing_columns(
LOG(WARNING) << "failed to fetch value through row column";
return st;
}
if (_has_delete_predicate) {
auto st = tablet->fetch_value_through_row_column(rowset, *_tablet_schema,
seg_it.first, rids, _all_cids,
full_schema_predicate_block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch full schema value through row column";
return st;
}
}
continue;
}
for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
Expand All @@ -585,6 +629,64 @@ Status VerticalSegmentWriter::_fill_missing_columns(
return st;
}
}
if (_has_delete_predicate) {
for (size_t cid = 0; cid < mutable_full_schema_predicate_columns.size(); ++cid) {
TabletColumn tablet_column = _tablet_schema->column(_all_cids[cid]);
auto st = tablet->fetch_value_by_rowids(
rowset, seg_it.first, rids, tablet_column,
mutable_full_schema_predicate_columns[cid]);
// set read value to output block
if (!st.ok()) {
LOG(WARNING) << "failed to fetch full schema value by rowids";
return st;
}
}
}
}
if (_has_delete_predicate) {
// Get and parse the delete predicate from each rowset.
// Use 'vector<bool> _delete_predicate_rows' to record rows deleted by delete predicates.
// Here's an example. We perform 3 operations. The table schema is: int k1, varchar v1 (nullable).
// 1. Insert into table values(1,'a');
// 2. Insert into table values(2,'b');
// 3. Delete from table where v1='b';
// Now, the rowsets are as follows:
// rowset1 ID:1 Version:[1-1] Type:DATA contains (1,'a')
// rowset2 ID:2 Version:[2-2] Type:DATA contains (2,'b')
// rowset3 ID:3 Version:[3-3] Type:DELETE with the delete predicate: v1='b'
// For a partial update, we first read all old data. Then we compare the new data with the old data.
// If the key of the new data exists, we update only the specified columns of the new data.
// If the key does not exist, we insert the new data.
// In the example above, if we perform a partial update: 4. Insert into table (k1) values(2);
// We must consider the delete predicate, which indicates that data where k1=2 does not exist (deleted by step3).
// Therefore, after reading all rowsets, our data block shows (1,'a')(2,'b').
// Given the delete predicate v1='b', we mark data (2,'b') as deleted.
// _delete_predicate_rows will be [False, True]. 'False' means (1,'a') exists.
// 'True' means (2,'b') does not exist. For the new partial update,
// we understand that key '2' does not exist, so we ignore the old value and write (2, NULL) into the table
// because the nullable column v1 will automatically be filled with 'NULL'.
// Question: Why not just delete (2,'b') in the data block?
// Answer: For historical reasons. Before reading the data block, we identify the number of rows needed
// based on the primary key index. Some preparatory work depends on this row count.
// Therefore, if we delete (2,'b') directly in the block, some checks, such as
// CHECK(row_num_need_to_read = block.rows()), will fail.

std::shared_ptr<AndBlockColumnPredicate> delete_condition_predicates =
AndBlockColumnPredicate::create_shared();
std::unordered_map<int32_t, std::vector<const ColumnPredicate*>>
del_predicates_for_zone_map;
_delete_handler->get_delete_conditions_after_version(
_rsid_to_rowset[rs_it.first]->end_version(), delete_condition_predicates.get(),
&del_predicates_for_zone_map);
std::vector<uint16_t> select_rowid_idx_vec;
uint16_t selected_size = 0;
selected_size = delete_condition_predicates->evaluate(
mutable_full_schema_predicate_columns, select_rowid_idx_vec.data(),
selected_size);
_delete_predicate_rows.resize(old_value_block.rows(), true);
for (size_t i = 0; i < selected_size; i++) {
_delete_predicate_rows[start_row_num + select_rowid_idx_vec[i] - 1] = false;
}
}
}
// build default value columns
Expand All @@ -600,7 +702,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(
delete_sign_column_data = delete_sign_col.get_data().data();
}

if (has_default_or_nullable || delete_sign_column_data != nullptr) {
if (has_default_or_nullable || delete_sign_column_data != nullptr || _has_delete_predicate) {
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& column = _tablet_schema->column(missing_cids[i]);
if (column.has_default_value()) {
Expand Down Expand Up @@ -645,7 +747,8 @@ Status VerticalSegmentWriter::_fill_missing_columns(
// to check if a row REALLY exists in the table.
if (use_default_or_null_flag[idx] ||
(delete_sign_column_data != nullptr &&
delete_sign_column_data[read_index[idx + segment_start_pos]] != 0)) {
delete_sign_column_data[read_index[idx + segment_start_pos]] != 0) ||
(_has_delete_predicate && _delete_predicate_rows[idx])) {
for (auto i = 0; i < missing_cids.size(); ++i) {
// if the column has default value, fill it with default value
// otherwise, if the column is nullable, fill it with null value
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ class VerticalSegmentWriter {
std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;

std::vector<RowsInBlock> _batched_blocks;
bool _has_delete_predicate = false;
std::unique_ptr<DeleteHandler> _delete_handler = nullptr;
std::vector<bool> _delete_predicate_rows;
std::vector<uint32_t> _all_cids;
};

} // namespace segment_v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@
4 4 4 4 4 0
5 5 5 5 5 0

-- !sql --
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5

-- !sql --
1 1 \N
2 2 \N
3 3 \N
4 4 4
5 5 5

-- !sql --
1 1 1 1 1
2 2 2 2 2
Expand Down Expand Up @@ -83,3 +97,17 @@
4 4 4 4 4 0
5 5 5 5 5 0

-- !sql --
1 1 1
2 2 2
3 3 3
4 4 4
5 5 5

-- !sql --
1 1 \N
2 2 \N
3 3 \N
4 4 4
5 5 5

Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,30 @@ suite('test_partial_update_delete') {
sql "set skip_delete_bitmap=true;"
qt_sql "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName3} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "drop table if exists ${tableName3};"

sql "set skip_delete_sign=false;"
sql "set skip_storage_engine_merge=false;"
sql "set skip_delete_bitmap=false;"
def tableName4 = "test_partial_update_delete4"
sql "DROP TABLE IF EXISTS ${tableName4};"
sql """ CREATE TABLE IF NOT EXISTS ${tableName4} (
`k1` int,
`c1` int,
`c2` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1",
"store_row_column" = "${use_row_store}"); """
sql "insert into ${tableName4} values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5);"
qt_sql "select * from ${tableName4} order by k1;"
sql "delete from ${tableName4} where k1=1;"
sql "set enable_unique_key_partial_update=true;"
sql "set enable_insert_strict=false;"
sql "insert into ${tableName4} (k1, c1) values (1,1),(2,2),(3,3);"
qt_sql "select * from ${tableName4} order by k1;"
}
}
}