Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 46 additions & 101 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/calc_delete_bitmap_executor.h"
#include "olap/delete_bitmap_calculator.h"
#include "olap/memtable.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
#include "olap/rowid_conversion.h"
#include "olap/rowset/beta_rowset.h"
Expand Down Expand Up @@ -56,55 +57,6 @@ bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", "update_

static bvar::Adder<size_t> g_total_tablet_num("doris_total_tablet_num");

// read columns by read plan
// read_index: ori_pos-> block_idx
Status read_columns_by_plan(TabletSchemaSPtr tablet_schema,
const std::vector<uint32_t> cids_to_read,
const PartialUpdateReadPlan& read_plan,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index,
const signed char* __restrict skip_map = nullptr) {
bool has_row_column = tablet_schema->has_row_store_for_all_columns();
auto mutable_columns = block.mutate_columns();
size_t read_idx = 0;
for (auto rs_it : read_plan) {
for (auto seg_it : rs_it.second) {
auto rowset_iter = rsid_to_rowset.find(rs_it.first);
CHECK(rowset_iter != rsid_to_rowset.end());
std::vector<uint32_t> rids;
for (auto [rid, pos] : seg_it.second) {
if (skip_map && skip_map[pos]) {
continue;
}
rids.emplace_back(rid);
(*read_index)[pos] = read_idx++;
}
if (has_row_column) {
auto st = BaseTablet::fetch_value_through_row_column(rowset_iter->second,
*tablet_schema, seg_it.first,
rids, cids_to_read, block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value through row column";
return st;
}
continue;
}
for (size_t cid = 0; cid < mutable_columns.size(); ++cid) {
TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]);
auto st = BaseTablet::fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids,
tablet_column, mutable_columns[cid]);
// set read value to output block
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value";
return st;
}
}
}
}
block.set_columns(std::move(mutable_columns));
return Status::OK();
}

Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t segid,
const TabletColumn& target_column,
SegmentCacheHandle* segment_cache_handle,
Expand Down Expand Up @@ -559,27 +511,6 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
return Status::Error<ErrorCode::KEY_NOT_FOUND>("can't find key in all rowsets");
}

void BaseTablet::prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan) {
auto rs_it = read_plan->find(row_location.rowset_id);
if (rs_it == read_plan->end()) {
std::map<uint32_t, std::vector<RidAndPos>> segid_to_rid;
std::vector<RidAndPos> rid_pos;
rid_pos.emplace_back(RidAndPos {row_location.row_id, pos});
segid_to_rid.emplace(row_location.segment_id, rid_pos);
read_plan->emplace(row_location.rowset_id, segid_to_rid);
return;
}
auto seg_it = rs_it->second.find(row_location.segment_id);
if (seg_it == rs_it->second.end()) {
std::vector<RidAndPos> rid_pos;
rid_pos.emplace_back(RidAndPos {row_location.row_id, pos});
rs_it->second.emplace(row_location.segment_id, rid_pos);
return;
}
seg_it->second.emplace_back(RidAndPos {row_location.row_id, pos});
}

// if user pass a token, then all calculation works will submit to a threadpool,
// user can get all delete bitmaps from that token.
// if `token` is nullptr, the calculation will run in local, and user can get the result
Expand Down Expand Up @@ -758,8 +689,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// So here we should read version 5's columns and build a new row, which is
// consists of version 6's update columns and version 5's origin columns
// here we build 2 read plan for ori values and update values
prepare_to_read(loc, pos, &read_plan_ori);
prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update);
read_plan_ori.prepare_to_read(loc, pos);
read_plan_update.prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos);
rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
++pos;
// delete bitmap will be calculate when memtable flush and
Expand Down Expand Up @@ -930,6 +861,40 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t
return Status::OK();
}

const signed char* BaseTablet::get_delete_sign_column_data(vectorized::Block& block,
size_t rows_at_least) {
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr) {
const auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
if (delete_sign_col.size() >= rows_at_least) {
return delete_sign_col.get_data().data();
}
}
return nullptr;
};

Status BaseTablet::generate_default_value_block(const TabletSchema& schema,
const std::vector<uint32_t>& cids,
const std::vector<std::string>& default_values,
const vectorized::Block& ref_block,
vectorized::Block& default_value_block) {
auto mutable_default_value_columns = default_value_block.mutate_columns();
for (auto i = 0; i < cids.size(); ++i) {
const auto& column = schema.column(cids[i]);
if (column.has_default_value()) {
const auto& default_value = default_values[i];
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
default_value.size());
RETURN_IF_ERROR(ref_block.get_by_position(i).type->from_string(
rb, mutable_default_value_columns[i].get()));
}
}
default_value_block.set_columns(std::move(mutable_default_value_columns));
return Status::OK();
}

Status BaseTablet::generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update,
Expand All @@ -947,27 +912,13 @@ Status BaseTablet::generate_new_block_for_partial_update(
auto old_block = rowset_schema->create_block_by_cids(missing_cids);
auto update_block = rowset_schema->create_block_by_cids(update_cids);

auto get_delete_sign_column_data = [](vectorized::Block& block,
size_t rows) -> const signed char* {
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr) {
const auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
if (delete_sign_col.size() >= rows) {
return delete_sign_col.get_data().data();
}
}
return nullptr;
};

// rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block
std::map<uint32_t, uint32_t> read_index_update;

// read current rowset first, if a row in the current rowset has delete sign mark
// we don't need to read values from old block
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update,
rsid_to_rowset, update_block, &read_index_update));
RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(
*rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update));
size_t update_rows = read_index_update.size();
for (auto i = 0; i < update_cids.size(); ++i) {
for (auto idx = 0; idx < update_rows; ++idx) {
Expand All @@ -986,27 +937,21 @@ Status BaseTablet::generate_new_block_for_partial_update(

// rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset,
old_block, &read_index_old, new_block_delete_signs));
RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, missing_cids, rsid_to_rowset,
old_block, &read_index_old,
new_block_delete_signs));
size_t old_rows = read_index_old.size();
const auto* __restrict old_block_delete_signs =
get_delete_sign_column_data(old_block, old_rows);

// build default value block
auto default_value_block = old_block.clone_empty();
auto mutable_default_value_columns = default_value_block.mutate_columns();
if (old_block_delete_signs != nullptr || new_block_delete_signs != nullptr) {
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& column = rowset_schema->column(missing_cids[i]);
if (column.has_default_value()) {
const auto& default_value = partial_update_info->default_values[i];
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
default_value.size());
RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string(
rb, mutable_default_value_columns[i].get()));
}
}
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
*rowset_schema, missing_cids, partial_update_info->default_values, old_block,
default_value_block));
}
auto mutable_default_value_columns = default_value_block.mutate_columns();

CHECK(update_rows >= old_rows);

Expand Down
15 changes: 11 additions & 4 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "common/status.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
Expand All @@ -39,6 +38,8 @@ class RowsetWriter;
class CalcDeleteBitmapToken;
class SegmentCacheHandle;
class RowIdConversion;
struct PartialUpdateInfo;
class PartialUpdateReadPlan;

struct TabletWithVersion {
BaseTabletSPtr tablet;
Expand Down Expand Up @@ -150,9 +151,6 @@ class BaseTablet {
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset = nullptr, bool with_rowid = true);

static void prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan);

// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but
// finally this rowset publish version with 8, we should make up data
Expand Down Expand Up @@ -189,6 +187,15 @@ class BaseTablet {
int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids,
std::vector<RowsetSharedPtr>* rowsets = nullptr);

static const signed char* get_delete_sign_column_data(vectorized::Block& block,
size_t rows_at_least = 0);

static Status generate_default_value_block(const TabletSchema& schema,
const std::vector<uint32_t>& cids,
const std::vector<std::string>& default_values,
const vectorized::Block& ref_block,
vectorized::Block& default_value_block);

static Status generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const PartialUpdateReadPlan& read_plan_ori,
Expand Down
16 changes: 4 additions & 12 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <typeinfo>
#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "io/io_common.h"
#include "olap/olap_define.h"
Expand Down Expand Up @@ -508,28 +509,19 @@ class DeleteBitmap;
// merge on write context
struct MowContext {
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
const std::vector<RowsetSharedPtr>& rowset_ptrs, std::shared_ptr<DeleteBitmap> db)
std::vector<RowsetSharedPtr> rowset_ptrs, std::shared_ptr<DeleteBitmap> db)
: max_version(version),
txn_id(txnid),
rowset_ids(ids),
rowset_ptrs(rowset_ptrs),
delete_bitmap(db) {}
rowset_ptrs(std::move(rowset_ptrs)),
delete_bitmap(std::move(db)) {}
int64_t max_version;
int64_t txn_id;
const RowsetIdUnorderedSet& rowset_ids;
std::vector<RowsetSharedPtr> rowset_ptrs;
std::shared_ptr<DeleteBitmap> delete_bitmap;
};

// used in mow partial update
struct RidAndPos {
uint32_t rid;
// pos in block
size_t pos;
};

using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>;

// used for controll compaction
struct VersionWithTime {
std::atomic<int64_t> version;
Expand Down
Loading