Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 191 additions & 34 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,23 @@
#include "exec/schema_scanner/schema_views_scanner.h"
#include "runtime/define_primitive_type.h"
#include "runtime/string_value.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"

namespace doris {

DorisServer* SchemaScanner::_s_doris_server;

SchemaScanner::SchemaScanner(ColumnDesc* columns, int column_num)
SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns)
: _is_init(false),
_param(nullptr),
_columns(columns),
_column_num(column_num),
_tuple_desc(nullptr),
_schema_table_type(TSchemaTableType::SCH_INVALID) {}

SchemaScanner::SchemaScanner(ColumnDesc* columns, int column_num, TSchemaTableType::type type)
: _is_init(false),
_param(nullptr),
_columns(columns),
_column_num(column_num),
_tuple_desc(nullptr),
_schema_table_type(type) {}

SchemaScanner::~SchemaScanner() {
if (_is_create_columns == true && _columns != nullptr) {
delete[] _columns;
_columns = nullptr;
}
}
SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns, TSchemaTableType::type type)
: _is_init(false), _param(nullptr), _columns(columns), _schema_table_type(type) {}

SchemaScanner::~SchemaScanner() = default;

Status SchemaScanner::start(RuntimeState* state) {
if (!_is_init) {
Expand All @@ -83,6 +73,19 @@ Status SchemaScanner::get_next_row(Tuple* tuple, MemPool* pool, bool* eos) {
return Status::OK();
}

Status SchemaScanner::get_next_block(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}

if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}

*eos = true;
return Status::OK();
}

Status SchemaScanner::init(SchemaScannerParam* param, ObjectPool* pool) {
if (_is_init) {
return Status::OK();
Expand All @@ -91,18 +94,22 @@ Status SchemaScanner::init(SchemaScannerParam* param, ObjectPool* pool) {
return Status::InternalError("invalid parameter");
}

if (_schema_table_type == TSchemaTableType::SCH_BACKENDS) {
RETURN_IF_ERROR(create_columns(param->table_structure, pool));
}

if (nullptr == _columns) {
if (_columns.empty()) {
return Status::InternalError("invalid parameter");
}

RETURN_IF_ERROR(create_tuple_desc(pool));

_param = param;
_is_init = true;

if (_param->profile) {
_get_db_timer = ADD_TIMER(_param->profile, "GetDbTime");
_get_table_timer = ADD_TIMER(_param->profile, "GetTableTime");
_get_describe_timer = ADD_TIMER(_param->profile, "GetDescribeTime");
_fill_block_timer = ADD_TIMER(_param->profile, "FillBlockTime");
}

return Status::OK();
}

Expand Down Expand Up @@ -145,23 +152,173 @@ SchemaScanner* SchemaScanner::create(TSchemaTableType::type type) {
}
}

Status SchemaScanner::create_columns(const std::vector<TSchemaTableStructure>* table_structure,
ObjectPool* pool) {
_column_num = table_structure->size();
_columns = new ColumnDesc[_column_num];
_is_create_columns = true;
for (size_t idx = 0; idx < table_structure->size(); ++idx) {
_columns[idx].name = table_structure->at(idx).column_name.c_str();
_columns[idx].type = thrift_to_type(table_structure->at(idx).type);
_columns[idx].size = table_structure->at(idx).len;
_columns[idx].is_null = table_structure->at(idx).is_null;
Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas) {
const ColumnDesc& col_desc = _columns[pos];
vectorized::MutableColumnPtr column_ptr;
column_ptr = std::move(*block->get_by_position(pos).column).assume_mutable();
vectorized::IColumn* col_ptr = column_ptr.get();

auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);

// Resize in advance to improve insertion efficiency.
size_t fill_num = datas.size();
col_ptr = &nullable_column->get_nested_column();
for (int i = 0; i < fill_num; ++i) {
auto data = datas[i];
if (data == nullptr) {
// For nested column need not insert default.
nullable_column->insert_data(nullptr, 0);
continue;
} else {
nullable_column->get_null_map_data().emplace_back(0);
}
switch (col_desc.type) {
case TYPE_HLL: {
HyperLogLog* hll_slot = reinterpret_cast<HyperLogLog*>(data);
reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot);
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
StringRef* str_slot = reinterpret_cast<StringRef*>(data);
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data,
str_slot->size);
break;
}

case TYPE_BOOLEAN: {
uint8_t num = *reinterpret_cast<bool*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_TINYINT: {
int8_t num = *reinterpret_cast<int8_t*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_SMALLINT: {
int16_t num = *reinterpret_cast<int16_t*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_INT: {
int32_t num = *reinterpret_cast<int32_t*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_BIGINT: {
int64_t num = *reinterpret_cast<int64_t*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_LARGEINT: {
__int128 num;
memcpy(&num, data, sizeof(__int128));
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_FLOAT: {
float num = *reinterpret_cast<float*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_DOUBLE: {
double num = *reinterpret_cast<double*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_DATE: {
vectorized::VecDateTimeValue value;
DateTimeValue* ts_slot = reinterpret_cast<DateTimeValue*>(data);
value.convert_dt_to_vec_dt(ts_slot);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(&value), 0);
break;
}

case TYPE_DATEV2: {
uint32_t num = *reinterpret_cast<uint32_t*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_DATETIME: {
vectorized::VecDateTimeValue value;
DateTimeValue* ts_slot = reinterpret_cast<DateTimeValue*>(data);
value.convert_dt_to_vec_dt(ts_slot);
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(&value), 0);
break;
}

case TYPE_DATETIMEV2: {
uint32_t num = *reinterpret_cast<uint64_t*>(data);
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)->insert_value(
num);
break;
}

case TYPE_DECIMALV2: {
const vectorized::Int128 num = (reinterpret_cast<PackedInt128*>(data))->value;
reinterpret_cast<vectorized::ColumnDecimal128*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}
case TYPE_DECIMAL128I: {
const vectorized::Int128 num = (reinterpret_cast<PackedInt128*>(data))->value;
reinterpret_cast<vectorized::ColumnDecimal128I*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}

case TYPE_DECIMAL32: {
const int32_t num = *reinterpret_cast<int32_t*>(data);
reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}

case TYPE_DECIMAL64: {
const int64_t num = *reinterpret_cast<int64_t*>(data);
reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}

default: {
DCHECK(false) << "bad slot type: " << col_desc.type;
std::stringstream ss;
ss << "Fail to convert schema type:'" << col_desc.type << " on column:`"
<< std::string(col_desc.name) + "`";
return Status::InternalError(ss.str());
}
}
}
return Status::OK();
}

Status SchemaScanner::create_tuple_desc(ObjectPool* pool) {
int null_column = 0;
for (int i = 0; i < _column_num; ++i) {
for (int i = 0; i < _columns.size(); ++i) {
if (_columns[i].is_null) {
null_column++;
}
Expand All @@ -172,7 +329,7 @@ Status SchemaScanner::create_tuple_desc(ObjectPool* pool) {
int null_byte = 0;
int null_bit = 0;

for (int i = 0; i < _column_num; ++i) {
for (int i = 0; i < _columns.size(); ++i) {
TSlotDescriptor t_slot_desc;
if (_columns[i].type == TYPE_DECIMALV2) {
t_slot_desc.__set_slotType(TypeDescriptor::create_decimalv2_type(27, 9).to_thrift());
Expand Down
33 changes: 21 additions & 12 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@
#include "gen_cpp/Types_types.h"
#include "runtime/mem_pool.h"
#include "runtime/tuple.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"

namespace doris {

// forehead declare class, because jni function init in DorisServer.
class DorisServer;
class RuntimeState;

namespace vectorized {
class Block;
}

// scanner parameter from frontend
struct SchemaScannerParam {
const std::string* db;
Expand All @@ -45,6 +51,7 @@ struct SchemaScannerParam {
int64_t thread_id;
const std::vector<TSchemaTableStructure>* table_structure;
const std::string* catalog;
std::unique_ptr<RuntimeProfile> profile;

SchemaScannerParam()
: db(nullptr),
Expand All @@ -70,43 +77,45 @@ class SchemaScanner {
int precision = -1;
int scale = -1;
};
SchemaScanner(ColumnDesc* columns, int column_num);
SchemaScanner(ColumnDesc* columns, int column_num, TSchemaTableType::type type);
SchemaScanner(const std::vector<ColumnDesc>& columns);
SchemaScanner(const std::vector<ColumnDesc>& columns, TSchemaTableType::type type);
virtual ~SchemaScanner();

// init object need information, schema etc.
virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
// Start to work
virtual Status start(RuntimeState* state);
virtual Status get_next_row(Tuple* tuple, MemPool* pool, bool* eos);
virtual Status get_next_block(vectorized::Block* block, bool* eos);
const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
// factory function
static SchemaScanner* create(TSchemaTableType::type type);

const TupleDescriptor* tuple_desc() const { return _tuple_desc; }
const TSchemaTableType::type type() const { return _schema_table_type; }

static void set_doris_server(DorisServer* doris_server) { _s_doris_server = doris_server; }

protected:
Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas);
Status create_tuple_desc(ObjectPool* pool);
Status create_columns(const std::vector<TSchemaTableStructure>* table_structure,
ObjectPool* pool);

bool _is_init;
// this is used for sub class
SchemaScannerParam* _param;
// pointer to schema table's column desc
ColumnDesc* _columns;
// num of columns
int _column_num;
// schema table's column desc
std::vector<ColumnDesc> _columns;
TupleDescriptor* _tuple_desc;

// _is_create_columns means if ColumnDesc is created from FE.
// `_columns` should be deleted if _is_create_columns = true.
bool _is_create_columns = false;

static DorisServer* _s_doris_server;

TSchemaTableType::type _schema_table_type;

RuntimeProfile::Counter* _get_db_timer = nullptr;
RuntimeProfile::Counter* _get_table_timer = nullptr;
RuntimeProfile::Counter* _get_describe_timer = nullptr;
RuntimeProfile::Counter* _fill_block_timer = nullptr;
};

} // namespace doris
Loading