-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix](schema scanner)change schema_scanner::get_next_row to get_next_block #15718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
70856aa
add schema_tables_scanner get_next_block
Cai-Yao bb01d2b
clang-format
Cai-Yao 5f54c42
clang-format 2
Cai-Yao f1d4f4e
fix
Cai-Yao 471102b
change schema_backends_scanner
Cai-Yao 8dffcce
modify charsets, collations
Cai-Yao 565268a
fix bug and modify columns
Cai-Yao a9a356a
modify left schema scanner
Cai-Yao 3d5a7d2
add regression-test
Cai-Yao 8249438
fix rowsets_scanner
Cai-Yao 945b008
fix
Cai-Yao 941e410
fix again
Cai-Yao 296b24d
assume_mutable replace mutable
Cai-Yao f0a04e3
fix regression test error and add sys tables test
Cai-Yao b3a4381
fix code format
Cai-Yao 849f04e
add License
Cai-Yao 8036bbb
fix regression test
Cai-Yao 52e59cf
fix regression again
Cai-Yao d1942e5
fix format
Cai-Yao 6978825
fix rebase bug
Cai-Yao 008dfda
change tuple_desc to column_desc
Cai-Yao 908c179
refactor
Cai-Yao e1a1146
fix
Cai-Yao 4f84de5
fix be ut bug
Cai-Yao File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -33,34 +33,24 @@ | |||||
| #include "exec/schema_scanner/schema_variables_scanner.h" | ||||||
| #include "exec/schema_scanner/schema_views_scanner.h" | ||||||
| #include "runtime/define_primitive_type.h" | ||||||
| #include "vec/columns/column.h" | ||||||
| #include "vec/common/string_ref.h" | ||||||
| #include "vec/core/block.h" | ||||||
|
|
||||||
| namespace doris { | ||||||
|
|
||||||
| DorisServer* SchemaScanner::_s_doris_server; | ||||||
|
|
||||||
| SchemaScanner::SchemaScanner(ColumnDesc* columns, int column_num) | ||||||
| SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns) | ||||||
| : _is_init(false), | ||||||
| _param(nullptr), | ||||||
| _columns(columns), | ||||||
| _column_num(column_num), | ||||||
| _tuple_desc(nullptr), | ||||||
| _schema_table_type(TSchemaTableType::SCH_INVALID) {} | ||||||
|
|
||||||
| SchemaScanner::SchemaScanner(ColumnDesc* columns, int column_num, TSchemaTableType::type type) | ||||||
| : _is_init(false), | ||||||
| _param(nullptr), | ||||||
| _columns(columns), | ||||||
| _column_num(column_num), | ||||||
| _tuple_desc(nullptr), | ||||||
| _schema_table_type(type) {} | ||||||
| SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns, TSchemaTableType::type type) | ||||||
| : _is_init(false), _param(nullptr), _columns(columns), _schema_table_type(type) {} | ||||||
|
|
||||||
| SchemaScanner::~SchemaScanner() { | ||||||
| if (_is_create_columns == true && _columns != nullptr) { | ||||||
| delete[] _columns; | ||||||
| _columns = nullptr; | ||||||
| } | ||||||
| } | ||||||
| SchemaScanner::~SchemaScanner() {} | ||||||
|
|
||||||
| Status SchemaScanner::start(RuntimeState* state) { | ||||||
| if (!_is_init) { | ||||||
|
|
@@ -70,12 +60,12 @@ Status SchemaScanner::start(RuntimeState* state) { | |||||
| return Status::OK(); | ||||||
| } | ||||||
|
|
||||||
| Status SchemaScanner::get_next_row(Tuple* tuple, MemPool* pool, bool* eos) { | ||||||
| Status SchemaScanner::get_next_block(vectorized::Block* block, bool* eos) { | ||||||
| if (!_is_init) { | ||||||
| return Status::InternalError("used before initialized."); | ||||||
| } | ||||||
|
|
||||||
| if (nullptr == tuple || nullptr == pool || nullptr == eos) { | ||||||
| if (nullptr == block || nullptr == eos) { | ||||||
| return Status::InternalError("input pointer is nullptr."); | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -91,15 +81,12 @@ Status SchemaScanner::init(SchemaScannerParam* param, ObjectPool* pool) { | |||||
| return Status::InternalError("invalid parameter"); | ||||||
| } | ||||||
|
|
||||||
| if (_schema_table_type == TSchemaTableType::SCH_BACKENDS) { | ||||||
| RETURN_IF_ERROR(create_columns(param->table_structure, pool)); | ||||||
| } | ||||||
|
|
||||||
| if (nullptr == _columns) { | ||||||
| if (_columns.empty()) { | ||||||
| return Status::InternalError("invalid parameter"); | ||||||
| } | ||||||
|
|
||||||
| RETURN_IF_ERROR(create_tuple_desc(pool)); | ||||||
|
|
||||||
| _param = param; | ||||||
| _is_init = true; | ||||||
|
|
||||||
|
|
@@ -145,23 +132,161 @@ SchemaScanner* SchemaScanner::create(TSchemaTableType::type type) { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| Status SchemaScanner::create_columns(const std::vector<TSchemaTableStructure>* table_structure, | ||||||
| ObjectPool* pool) { | ||||||
| _column_num = table_structure->size(); | ||||||
| _columns = new ColumnDesc[_column_num]; | ||||||
| _is_create_columns = true; | ||||||
| for (size_t idx = 0; idx < table_structure->size(); ++idx) { | ||||||
| _columns[idx].name = table_structure->at(idx).column_name.c_str(); | ||||||
| _columns[idx].type = thrift_to_type(table_structure->at(idx).type); | ||||||
| _columns[idx].size = table_structure->at(idx).len; | ||||||
| _columns[idx].is_null = table_structure->at(idx).is_null; | ||||||
| Status SchemaScanner::fill_dest_column(vectorized::Block* block, void* data, | ||||||
| const ColumnDesc& col_desc) { | ||||||
| if (!block->has(col_desc.name)) { | ||||||
| return Status::OK(); | ||||||
| } | ||||||
| vectorized::MutableColumnPtr column_ptr = | ||||||
| std::move(*block->get_by_name(col_desc.name).column).assume_mutable(); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. warning: std::move of the const expression has no effect; remove std::move() [performance-move-const-arg]
Suggested change
|
||||||
| vectorized::IColumn* col_ptr = column_ptr.get(); | ||||||
|
|
||||||
| if (data == nullptr) { | ||||||
| auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr); | ||||||
| nullable_column->insert_data(nullptr, 0); | ||||||
| return Status::OK(); | ||||||
| } | ||||||
| auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr); | ||||||
| nullable_column->get_null_map_data().push_back(0); | ||||||
| col_ptr = &nullable_column->get_nested_column(); | ||||||
| switch (col_desc.type) { | ||||||
| case TYPE_HLL: { | ||||||
| HyperLogLog* hll_slot = reinterpret_cast<HyperLogLog*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot); | ||||||
| break; | ||||||
| } | ||||||
| case TYPE_VARCHAR: | ||||||
| case TYPE_CHAR: | ||||||
| case TYPE_STRING: { | ||||||
| StringRef* str_slot = reinterpret_cast<StringRef*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data, | ||||||
| str_slot->size); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_BOOLEAN: { | ||||||
| uint8_t num = *reinterpret_cast<bool*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_TINYINT: { | ||||||
| int8_t num = *reinterpret_cast<int8_t*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_SMALLINT: { | ||||||
| int16_t num = *reinterpret_cast<int16_t*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_INT: { | ||||||
| int32_t num = *reinterpret_cast<int32_t*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_BIGINT: { | ||||||
| int64_t num = *reinterpret_cast<int64_t*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_LARGEINT: { | ||||||
| __int128 num; | ||||||
| memcpy(&num, data, sizeof(__int128)); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_FLOAT: { | ||||||
| float num = *reinterpret_cast<float*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value( | ||||||
| num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_DOUBLE: { | ||||||
| double num = *reinterpret_cast<double*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value( | ||||||
| num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_DATE: { | ||||||
| vectorized::VecDateTimeValue value; | ||||||
| DateTimeValue* ts_slot = reinterpret_cast<DateTimeValue*>(data); | ||||||
| value.convert_dt_to_vec_dt(ts_slot); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( | ||||||
| reinterpret_cast<char*>(&value), 0); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_DATEV2: { | ||||||
| uint32_t num = *reinterpret_cast<uint32_t*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)->insert_value(num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_DATETIME: { | ||||||
| vectorized::VecDateTimeValue value; | ||||||
| DateTimeValue* ts_slot = reinterpret_cast<DateTimeValue*>(data); | ||||||
| value.convert_dt_to_vec_dt(ts_slot); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data( | ||||||
| reinterpret_cast<char*>(&value), 0); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_DATETIMEV2: { | ||||||
| uint32_t num = *reinterpret_cast<uint64_t*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)->insert_value(num); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_DECIMALV2: { | ||||||
| const vectorized::Int128 num = (reinterpret_cast<PackedInt128*>(data))->value; | ||||||
| reinterpret_cast<vectorized::ColumnDecimal128*>(col_ptr)->insert_data( | ||||||
| reinterpret_cast<const char*>(&num), 0); | ||||||
| break; | ||||||
| } | ||||||
| case TYPE_DECIMAL128I: { | ||||||
| const vectorized::Int128 num = (reinterpret_cast<PackedInt128*>(data))->value; | ||||||
| reinterpret_cast<vectorized::ColumnDecimal128I*>(col_ptr)->insert_data( | ||||||
| reinterpret_cast<const char*>(&num), 0); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_DECIMAL32: { | ||||||
| const int32_t num = *reinterpret_cast<int32_t*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data( | ||||||
| reinterpret_cast<const char*>(&num), 0); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| case TYPE_DECIMAL64: { | ||||||
| const int64_t num = *reinterpret_cast<int64_t*>(data); | ||||||
| reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data( | ||||||
| reinterpret_cast<const char*>(&num), 0); | ||||||
| break; | ||||||
| } | ||||||
|
|
||||||
| default: { | ||||||
| DCHECK(false) << "bad slot type: " << col_desc.type; | ||||||
| std::stringstream ss; | ||||||
| ss << "Fail to convert schema type:'" << col_desc.type << " on column:`" | ||||||
| << std::string(col_desc.name) + "`"; | ||||||
| return Status::InternalError(ss.str()); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| return Status::OK(); | ||||||
| } | ||||||
|
|
||||||
| Status SchemaScanner::create_tuple_desc(ObjectPool* pool) { | ||||||
| int null_column = 0; | ||||||
| for (int i = 0; i < _column_num; ++i) { | ||||||
| for (int i = 0; i < _columns.size(); ++i) { | ||||||
| if (_columns[i].is_null) { | ||||||
| null_column++; | ||||||
| } | ||||||
|
|
@@ -172,7 +297,7 @@ Status SchemaScanner::create_tuple_desc(ObjectPool* pool) { | |||||
| int null_byte = 0; | ||||||
| int null_bit = 0; | ||||||
|
|
||||||
| for (int i = 0; i < _column_num; ++i) { | ||||||
| for (int i = 0; i < _columns.size(); ++i) { | ||||||
| TSlotDescriptor t_slot_desc; | ||||||
| if (_columns[i].type == TYPE_DECIMALV2) { | ||||||
| t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27, 9).to_thrift()); | ||||||
|
|
||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: use '= default' to define a trivial destructor [modernize-use-equals-default]