Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
for (int i = 0; i < resp.binary_row_data_size(); ++i) {
vectorized::JsonbSerializeUtil::jsonb_to_block(
serdes, resp.binary_row_data(i).data(), resp.binary_row_data(i).size(),
col_uid_to_idx, *output_block, default_values);
col_uid_to_idx, *output_block, default_values, {});
}
return Status::OK();
}
Expand Down Expand Up @@ -405,7 +405,7 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
row_loc.segment_id(), row_loc.ordinal_id());
// fetch by row store, more effcient way
if (request.fetch_row_store()) {
CHECK(tablet->tablet_schema()->store_row_column());
CHECK(tablet->tablet_schema()->has_row_store_for_all_columns());
RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id());
string* value = response->add_binary_row_data();
RETURN_IF_ERROR(scope_timer_run(
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Status read_columns_by_plan(TabletSchemaSPtr tablet_schema,
const PartialUpdateReadPlan& read_plan,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index) {
bool has_row_column = tablet_schema->store_row_column();
bool has_row_column = tablet_schema->has_row_store_for_all_columns();
auto mutable_columns = block.mutate_columns();
size_t read_idx = 0;
for (auto rs_it : read_plan) {
Expand Down Expand Up @@ -449,7 +449,6 @@ Status BaseTablet::lookup_row_data(const Slice& encoded_key, const RowLocation&
BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset);
CHECK(rowset);
const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
CHECK(tablet_schema->store_row_column());
SegmentCacheHandle segment_cache_handle;
std::unique_ptr<segment_v2::ColumnIterator> column_iterator;
const auto& column = *DORIS_TRY(tablet_schema->column(BeConsts::ROW_STORE_COL));
Expand Down Expand Up @@ -874,7 +873,7 @@ Status BaseTablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset,

BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset);
CHECK(rowset);
CHECK(tablet_schema.store_row_column());
CHECK(tablet_schema.has_row_store_for_all_columns());
SegmentCacheHandle segment_cache_handle;
std::unique_ptr<segment_v2::ColumnIterator> column_iterator;
OlapReaderStatistics stats;
Expand All @@ -900,7 +899,7 @@ Status BaseTablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset,
serdes[i] = type->get_serde();
}
vectorized::JsonbSerializeUtil::jsonb_to_block(serdes, *string_column, col_uid_to_idx, block,
default_values);
default_values, {});
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
}

vectorized::schema_util::ParseContext ctx;
ctx.record_raw_json_column = _context.tablet_schema->store_row_column();
ctx.record_raw_json_column = _context.tablet_schema->has_row_store_for_all_columns();
RETURN_IF_ERROR(vectorized::schema_util::parse_variant_columns(block, variant_column_pos, ctx));
return Status::OK();
}
Expand Down
42 changes: 17 additions & 25 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void SegmentWriter::_maybe_invalid_row_cache(const std::string& key) {
// Just invalid row cache for simplicity, since the rowset is not visible at present.
// If we update/insert cache, if load failed rowset will not be visible but cached data
// will be visible, and lead to inconsistency.
if (!config::disable_storage_row_cache && _tablet_schema->store_row_column() &&
if (!config::disable_storage_row_cache && _tablet_schema->has_row_store_for_all_columns() &&
_opts.write_type == DataWriteType::TYPE_DIRECT) {
// invalidate cache
RowCache::instance()->erase({_opts.rowset_ctx->tablet_id, key});
Expand Down Expand Up @@ -437,27 +437,23 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
}
MonotonicStopWatch watch;
watch.start();
// find row column id
int row_column_id = 0;
for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
if (_tablet_schema->column(i).is_row_store_column()) {
row_column_id = i;
auto* row_store_column = static_cast<vectorized::ColumnString*>(
block.get_by_position(i).column->assume_mutable_ref().assume_mutable().get());
row_store_column->clear();
vectorized::DataTypeSerDeSPtrs serdes =
vectorized::create_data_type_serdes(block.get_data_types());
vectorized::JsonbSerializeUtil::block_to_jsonb(
*_tablet_schema, block, *row_store_column, _tablet_schema->num_columns(),
serdes,
{_tablet_schema->row_columns_uids().begin(),
_tablet_schema->row_columns_uids().end()});
break;
}
}
if (row_column_id == 0) {
return;
}
vectorized::ColumnString* row_store_column =
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
.column->assume_mutable_ref()
.assume_mutable()
.get());
row_store_column->clear();
vectorized::DataTypeSerDeSPtrs serdes =
vectorized::create_data_type_serdes(block.get_data_types());
vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column,
_tablet_schema->num_columns(), serdes);

VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id
<< ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)"
<< watch.elapsed_time() / 1000;
Expand Down Expand Up @@ -669,11 +665,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag,
has_default_or_nullable, segment_start_pos, block));
full_block.set_columns(std::move(mutable_full_columns));
// row column should be filled here
if (_tablet_schema->store_row_column()) {
// convert block to row store format
_serialize_block_to_row_column(full_block);
}
// convert block to row store format
_serialize_block_to_row_column(full_block);

// convert missing columns and send to column writer
RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
Expand Down Expand Up @@ -741,7 +734,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
const auto& cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids;
auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing);
CHECK_EQ(cids_missing.size(), old_value_block.columns());
bool has_row_column = _tablet_schema->store_row_column();
bool has_row_column = _tablet_schema->has_row_store_for_all_columns();
// record real pos, key is input line num, value is old_block line num
std::map<uint32_t, uint32_t> read_index;
size_t read_idx = 0;
Expand Down Expand Up @@ -870,9 +863,8 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
<< ", _column_writers.size()=" << _column_writers.size();
// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_tablet_schema->store_row_column() &&
(_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE)) {
if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(block));
}

Expand Down
42 changes: 18 additions & 24 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "cloud/config.h"
Expand Down Expand Up @@ -265,7 +266,7 @@ void VerticalSegmentWriter::_maybe_invalid_row_cache(const std::string& key) con
// Just invalid row cache for simplicity, since the rowset is not visible at present.
// If we update/insert cache, if load failed rowset will not be visible but cached data
// will be visible, and lead to inconsistency.
if (!config::disable_storage_row_cache && _tablet_schema->store_row_column() &&
if (!config::disable_storage_row_cache && _tablet_schema->has_row_store_for_all_columns() &&
_opts.write_type == DataWriteType::TYPE_DIRECT) {
// invalidate cache
RowCache::instance()->erase({_opts.rowset_ctx->tablet_id, key});
Expand All @@ -278,27 +279,23 @@ void VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl
}
MonotonicStopWatch watch;
watch.start();
// find row column id
int row_column_id = 0;
for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
if (_tablet_schema->column(i).is_row_store_column()) {
row_column_id = i;
auto* row_store_column = static_cast<vectorized::ColumnString*>(
block.get_by_position(i).column->assume_mutable_ref().assume_mutable().get());
row_store_column->clear();
vectorized::DataTypeSerDeSPtrs serdes =
vectorized::create_data_type_serdes(block.get_data_types());
std::unordered_set<int> row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
_tablet_schema->row_columns_uids().end());
vectorized::JsonbSerializeUtil::block_to_jsonb(
*_tablet_schema, block, *row_store_column, _tablet_schema->num_columns(),
serdes, row_store_cids_set);
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add break

}
if (row_column_id == 0) {
return;
}
auto* row_store_column =
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
.column->assume_mutable_ref()
.assume_mutable()
.get());
row_store_column->clear();
vectorized::DataTypeSerDeSPtrs serdes =
vectorized::create_data_type_serdes(block.get_data_types());
vectorized::JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column,
_tablet_schema->num_columns(), serdes);

VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id
<< ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)"
<< watch.elapsed_time() / 1000;
Expand Down Expand Up @@ -500,10 +497,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
has_default_or_nullable, segment_start_pos, data.block));

// row column should be filled here
if (_tablet_schema->store_row_column()) {
// convert block to row store format
_serialize_block_to_row_column(full_block);
}
// convert block to row store format
_serialize_block_to_row_column(full_block);

// convert missing columns and send to column writer
const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
Expand Down Expand Up @@ -567,7 +562,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(
auto old_value_block = _tablet_schema->create_block_by_cids(missing_cids);
CHECK_EQ(missing_cids.size(), old_value_block.columns());
auto mutable_old_columns = old_value_block.mutate_columns();
bool has_row_column = _tablet_schema->store_row_column();
bool has_row_column = _tablet_schema->has_row_store_for_all_columns();
// record real pos, key is input line num, value is old_block line num
std::map<uint32_t, uint32_t> read_index;
size_t read_idx = 0;
Expand Down Expand Up @@ -833,9 +828,8 @@ Status VerticalSegmentWriter::write_batch() {
}
// Row column should be filled here when it's a directly write from memtable
// or it's schema change write(since column data type maybe changed, so we should reubild)
if (_tablet_schema->store_row_column() &&
(_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE)) {
if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
_opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
for (auto& data : _batched_blocks) {
// TODO: maybe we should pass range to this method
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(data.block));
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "cloud/cloud_schema_change_job.h"
#include "cloud/config.h"
#include "common/consts.h"
#include "common/logging.h"
#include "common/signal_handler.h"
#include "common/status.h"
Expand Down Expand Up @@ -1315,6 +1316,16 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
return Status::OK();
}

// if new tablet enable row store, or new tablet has different row store columns
if ((!base_tablet_schema->have_column(BeConsts::ROW_STORE_COL) &&
new_tablet_schema->have_column(BeConsts::ROW_STORE_COL)) ||
!std::equal(new_tablet_schema->row_columns_uids().begin(),
new_tablet_schema->row_columns_uids().end(),
base_tablet_schema->row_columns_uids().begin(),
base_tablet_schema->row_columns_uids().end())) {
*sc_directly = true;
}

for (size_t i = 0; i < new_tablet_schema->num_columns(); ++i) {
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
if (column_mapping->expr != nullptr) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,10 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
if (tablet_schema.__isset.skip_write_index_on_load) {
schema->set_skip_write_index_on_load(tablet_schema.skip_write_index_on_load);
}
if (tablet_schema.__isset.row_store_col_cids) {
schema->mutable_row_store_column_unique_ids()->Add(tablet_schema.row_store_col_cids.begin(),
tablet_schema.row_store_col_cids.end());
}
if (binlog_config.has_value()) {
BinlogConfig tmp_binlog_config;
tmp_binlog_config = binlog_config.value();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,9 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema, bool ignore_extrac
} else {
_inverted_index_storage_format = schema.inverted_index_storage_format();
}

_row_store_column_unique_ids.assign(schema.row_store_column_unique_ids().begin(),
schema.row_store_column_unique_ids().end());
}

void TabletSchema::copy_from(const TabletSchema& tablet_schema) {
Expand Down Expand Up @@ -1034,7 +1037,6 @@ void TabletSchema::build_current_tablet_schema(int64_t index_id, int32_t version
_is_in_memory = ori_tablet_schema.is_in_memory();
_disable_auto_compaction = ori_tablet_schema.disable_auto_compaction();
_enable_single_replica_compaction = ori_tablet_schema.enable_single_replica_compaction();
_store_row_column = ori_tablet_schema.store_row_column();
_skip_write_index_on_load = ori_tablet_schema.skip_write_index_on_load();
_sort_type = ori_tablet_schema.sort_type();
_sort_col_num = ori_tablet_schema.sort_col_num();
Expand Down Expand Up @@ -1193,6 +1195,8 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_schema_pb) const {
tablet_schema_pb->set_compression_type(_compression_type);
tablet_schema_pb->set_version_col_idx(_version_col_idx);
tablet_schema_pb->set_inverted_index_storage_format(_inverted_index_storage_format);
tablet_schema_pb->mutable_row_store_column_unique_ids()->Assign(
_row_store_column_unique_ids.begin(), _row_store_column_unique_ids.end());
}

size_t TabletSchema::row_size() const {
Expand Down
14 changes: 12 additions & 2 deletions be/src/olap/tablet_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <gen_cpp/segment_v2.pb.h>
#include <parallel_hashmap/phmap.h>

#include <algorithm>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: inclusion of deprecated C++ header 'stdint.h'; consider using 'cstdint' instead [modernize-deprecated-headers]

Suggested change
#include <algorithm>
#include <cstdint>

#include <map>
#include <memory>
#include <string>
Expand All @@ -31,6 +32,7 @@
#include <utility>
#include <vector>

#include "common/consts.h"
#include "common/status.h"
#include "gutil/stringprintf.h"
#include "olap/olap_common.h"
Expand Down Expand Up @@ -342,8 +344,10 @@ class TabletSchema {
_enable_single_replica_compaction = enable_single_replica_compaction;
}
bool enable_single_replica_compaction() const { return _enable_single_replica_compaction; }
void set_store_row_column(bool store_row_column) { _store_row_column = store_row_column; }
bool store_row_column() const { return _store_row_column; }
// indicate if full row store column(all the columns encodes as row) exists
bool has_row_store_for_all_columns() const {
return _store_row_column && row_columns_uids().empty();
}
void set_skip_write_index_on_load(bool skip) { _skip_write_index_on_load = skip; }
bool skip_write_index_on_load() const { return _skip_write_index_on_load; }
int32_t delete_sign_idx() const { return _delete_sign_idx; }
Expand Down Expand Up @@ -474,6 +478,8 @@ class TabletSchema {
void update_tablet_columns(const TabletSchema& tablet_schema,
const std::vector<TColumn>& t_columns);

const std::vector<int32_t>& row_columns_uids() const { return _row_store_column_unique_ids; }

private:
friend bool operator==(const TabletSchema& a, const TabletSchema& b);
friend bool operator!=(const TabletSchema& a, const TabletSchema& b);
Expand Down Expand Up @@ -515,6 +521,10 @@ class TabletSchema {
bool _store_row_column = false;
bool _skip_write_index_on_load = false;
InvertedIndexStorageFormatPB _inverted_index_storage_format = InvertedIndexStorageFormatPB::V1;

// Contains column ids of which columns should be encoded into row store.
// ATTN: For compability reason empty cids means all columns of tablet schema are encoded to row column
std::vector<int32_t> _row_store_column_unique_ids;
};

bool operator==(const TabletSchema& a, const TabletSchema& b);
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ class RuntimeState {
_query_options.mysql_row_binary_format;
}

bool enable_short_circuit_query_access_column_store() const {
return _query_options.__isset.enable_short_circuit_query_access_column_store &&
_query_options.enable_short_circuit_query_access_column_store;
}

// Appends error to the _error_log if there is space
bool log_error(const std::string& error);

Expand Down
Loading