Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 70 additions & 31 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,6 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
<< ", tablet: " << tablet_id() << " rowset: " << rowset_id
<< " seg_id: " << seg->id() << " dummy_version: " << end_version + 1
<< " rows: " << seg->num_rows() << " conflict rows: " << conflict_rows
<< " filtered rows: " << rids_be_overwritten.size()
<< " new generated rows: " << new_generated_rows
<< " bitmap num: " << delete_bitmap->get_delete_bitmap_count()
<< " bitmap cardinality: " << delete_bitmap->cardinality()
Expand Down Expand Up @@ -1005,6 +1004,13 @@ Status BaseTablet::generate_new_block_for_partial_update(
auto old_block = rowset_schema->create_block_by_cids(missing_cids);
auto update_block = rowset_schema->create_block_by_cids(update_cids);

bool have_input_seq_column = false;
if (rowset_schema->has_sequence_col()) {
have_input_seq_column =
(std::find(update_cids.cbegin(), update_cids.cend(),
rowset_schema->sequence_col_idx()) != update_cids.cend());
}

// rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block
std::map<uint32_t, uint32_t> read_index_update;

Expand Down Expand Up @@ -1058,10 +1064,25 @@ Status BaseTablet::generate_new_block_for_partial_update(
// before, even the `strict_mode` is true (which requires partial update
// load job can't insert new keys), this "new" key MUST be written into
// the new generated segment file.
if (new_block_delete_signs != nullptr && new_block_delete_signs[idx]) {
bool use_default = false;
bool new_row_delete_sign =
(new_block_delete_signs != nullptr && new_block_delete_signs[idx]);
bool old_row_delete_sign = (old_block_delete_signs != nullptr &&
old_block_delete_signs[read_index_old[idx]] != 0);
if (old_row_delete_sign) {
if (!rowset_schema->has_sequence_col()) {
use_default = true;
} else if (have_input_seq_column || !rs_column.is_seqeunce_col()) {
// to keep the sequence column value not decreasing, we should read values of seq column
// from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
// it may cause the merge-on-read based compaction to produce incorrect results
use_default = true;
}
}

if (new_row_delete_sign) {
mutable_column->insert_default();
} else if (old_block_delete_signs != nullptr &&
old_block_delete_signs[read_index_old[idx]] != 0) {
} else if (use_default) {
if (rs_column.has_default_value()) {
mutable_column->insert_from(*default_value_block.get_by_position(i).column, 0);
} else if (rs_column.is_nullable()) {
Expand Down Expand Up @@ -1090,6 +1111,10 @@ Status BaseTablet::generate_new_block_for_flexible_partial_update(
vectorized::Block* output_block) {
CHECK(output_block);

int32_t seq_col_unique_id = -1;
if (rowset_schema->has_sequence_col()) {
seq_col_unique_id = rowset_schema->column(rowset_schema->sequence_col_idx()).unique_id();
}
const auto& non_sort_key_cids = partial_update_info->missing_cids;
std::vector<uint32_t> all_cids(rowset_schema->num_columns());
std::iota(all_cids.begin(), all_cids.end(), 0);
Expand Down Expand Up @@ -1139,57 +1164,68 @@ Status BaseTablet::generate_new_block_for_flexible_partial_update(
update_block.get_by_position(skip_bitmap_col_idx).column->get_ptr().get())
->get_data());

VLOG_DEBUG << fmt::format(
"BaseTablet::generate_new_block_for_flexible_partial_update: "
"rids_be_overwritten.size()={}",
rids_be_overwritten.size());
if (rowset_schema->has_sequence_col() && !rids_be_overwritten.empty()) {
int32_t seq_col_unique_id =
rowset_schema->column(rowset_schema->sequence_col_idx()).unique_id();
// If the row specifies the sequence column, we should delete the current row becase the
// flexible partial update on the current row has been `overwritten` by the previous one with larger sequence
// column value.
for (auto it = rids_be_overwritten.begin(); it != rids_be_overwritten.end();) {
auto rid = *it;
if (!skip_bitmaps->at(rid).contains(seq_col_unique_id)) {
VLOG_DEBUG << fmt::format(
"BaseTablet::generate_new_block_for_flexible_partial_update: rid={} "
"filtered",
rid);
++it;
} else {
it = rids_be_overwritten.erase(it);
VLOG_DEBUG << fmt::format(
"BaseTablet::generate_new_block_for_flexible_partial_update: rid={} "
"keeped",
rid);
}
}
}

auto fill_one_cell = [&read_index_old](const TabletColumn& tablet_column, std::size_t idx,
vectorized::MutableColumnPtr& new_col,
const vectorized::IColumn& default_value_col,
const vectorized::IColumn& old_value_col,
const vectorized::IColumn& cur_col, bool skipped,
const signed char* delete_sign_column_data) {
auto fill_one_cell = [&read_index_old, &read_index_update, &rowset_schema, partial_update_info](
const TabletColumn& tablet_column, std::size_t idx,
vectorized::MutableColumnPtr& new_col,
const vectorized::IColumn& default_value_col,
const vectorized::IColumn& old_value_col,
const vectorized::IColumn& cur_col, bool skipped,
bool row_has_sequence_col,
const signed char* delete_sign_column_data) {
if (skipped) {
if (delete_sign_column_data != nullptr &&
delete_sign_column_data[read_index_old[cast_set<uint32_t>(idx)]] != 0) {
bool use_default = false;
bool old_row_delete_sign =
(delete_sign_column_data != nullptr &&
delete_sign_column_data[read_index_old[cast_set<uint32_t>(idx)]] != 0);
if (old_row_delete_sign) {
if (!rowset_schema->has_sequence_col()) {
use_default = true;
} else if (row_has_sequence_col ||
(!tablet_column.is_seqeunce_col() &&
(tablet_column.unique_id() !=
partial_update_info->sequence_map_col_uid()))) {
// to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
// from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
// it may cause the merge-on-read based compaction to produce incorrect results
use_default = true;
}
}
if (use_default) {
if (tablet_column.has_default_value()) {
new_col->insert_from(default_value_col, 0);
} else if (tablet_column.is_nullable()) {
assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>(
new_col.get())
->insert_default();
->insert_many_defaults(1);
} else if (tablet_column.is_auto_increment()) {
// For auto-increment column, its default value(generated value) is filled in current block in flush phase
// when the load doesn't specify the auto-increment column
// - if the previous conflicting row is deleted, we should use the value in current block as its final value
// - if the previous conflicting row is an insert, we should use the value in old block as its final value to
// keep consistency between replicas
new_col->insert_from(cur_col, read_index_update[cast_set<uint32_t>(idx)]);
} else {
new_col->insert(tablet_column.get_vec_type()->get_default());
}
} else {
new_col->insert_from(old_value_col, idx);
new_col->insert_from(old_value_col, read_index_old[cast_set<uint32_t>(idx)]);
}
} else {
new_col->insert_from(cur_col, idx);
new_col->insert_from(cur_col, read_index_update[cast_set<uint32_t>(idx)]);
}
};

Expand All @@ -1200,18 +1236,21 @@ Status BaseTablet::generate_new_block_for_flexible_partial_update(
auto col_uid = rs_column.unique_id();
for (auto idx = 0; idx < update_rows; ++idx) {
if (cid < rowset_schema->num_key_columns()) {
new_col->insert_from(cur_col, idx);
new_col->insert_from(cur_col, read_index_update[idx]);
} else {
const vectorized::IColumn& default_value_col =
*default_value_block.get_by_position(cid - rowset_schema->num_key_columns())
.column;
const vectorized::IColumn& old_value_col =
*old_block.get_by_position(cid - rowset_schema->num_key_columns()).column;
if (rids_be_overwritten.contains(idx)) {
new_col->insert_from(old_value_col, idx);
new_col->insert_from(old_value_col, read_index_old[idx]);
} else {
fill_one_cell(rs_column, idx, new_col, default_value_col, old_value_col,
cur_col, skip_bitmaps->at(idx).contains(col_uid),
rowset_schema->has_sequence_col()
? !skip_bitmaps->at(idx).contains(seq_col_unique_id)
: false,
old_block_delete_signs);
}
}
Expand Down
Loading
Loading