Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,11 +671,11 @@ Status PushBrokerReader::_get_next_reader() {
const_cast<cctz::time_zone*>(&_runtime_state->timezone_obj()),
_io_ctx.get(), _runtime_state.get());

std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_all_col_names, place_holder, _colname_to_value_range, _push_down_exprs,
_real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, false);
_all_col_names, _colname_to_value_range, _push_down_exprs, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
vectorized::TableSchemaChangeHelper::ConstNode::get_instance(), false);
_cur_reader = std::move(parquet_reader);
if (!init_status.ok()) {
return Status::InternalError("failed to init reader for file {}, err: {}", range.path,
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class SlotDescriptor {
MOCK_REMOVE(const) vectorized::DataTypePtr _type;
const TupleId _parent;
const int _col_pos;
const std::string _col_name;
MOCK_REMOVE(const) std::string _col_name;
const std::string _col_name_lower_case;

const int32_t _col_unique_id;
Expand Down
386 changes: 162 additions & 224 deletions be/src/vec/exec/format/orc/vorc_reader.cpp

Large diffs are not rendered by default.

91 changes: 47 additions & 44 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "vec/exec/format/column_type_convert.h"
#include "vec/exec/format/format_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/table/table_format_reader.h"
#include "vec/exec/format/table/transactional_hive_reader.h"
#include "vec/exprs/vliteral.h"
#include "vec/exprs/vslot_ref.h"
Expand Down Expand Up @@ -116,6 +117,12 @@ class OrcReader : public GenericReader {
ENABLE_FACTORY_CREATOR(OrcReader);

public:
Status get_file_type(const orc::Type** root) {
RETURN_IF_ERROR(_create_file_reader());
*root = &(_reader->getType());
return Status::OK();
}

struct Statistics {
int64_t fs_read_time = 0;
int64_t fs_read_calls = 0;
Expand Down Expand Up @@ -143,21 +150,19 @@ class OrcReader : public GenericReader {
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names,
const std::vector<std::string>& missing_column_names,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
const bool hive_use_column_names = true);
std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr =
TableSchemaChangeHelper::ConstNode::get_instance());

Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override;

Status _init_select_types(const orc::Type& type, int idx);

Status _fill_partition_columns(
Block* block, uint64_t rows,
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
Expand Down Expand Up @@ -185,14 +190,6 @@ class OrcReader : public GenericReader {
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) override;

Status get_schema_col_name_attribute(std::vector<std::string>* col_names,
std::vector<int32_t>* col_attributes,
const std::string& attribute, bool* exist_attribute);
void set_table_col_to_file_col(
std::unordered_map<std::string, std::string> table_col_to_file_col) {
_table_col_to_file_col = table_col_to_file_col;
}

void set_position_delete_rowids(std::vector<int64_t>* delete_rows) {
_position_delete_ordered_rowids = delete_rows;
}
Expand Down Expand Up @@ -221,6 +218,16 @@ class OrcReader : public GenericReader {
_row_id_column_iterator_pair = iterator_pair;
}

static bool inline is_hive1_col_name(const orc::Type* orc_type_ptr) {
for (uint64_t idx = 0; idx < orc_type_ptr->getSubtypeCount(); idx++) {
if (!_is_hive1_col_name(orc_type_ptr->getFieldName(idx))) {
return false;
}
}
return true;
}
static const orc::Type& remove_acid(const orc::Type& type);

protected:
void _collect_profile_before_close() override;

Expand Down Expand Up @@ -298,12 +305,8 @@ class OrcReader : public GenericReader {

void _init_profile();
Status _init_read_columns();
void _init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
std::vector<std::string>& orc_cols_lower_case,
std::unordered_map<std::string, const orc::Type*>& type_map,
bool* is_hive1_orc, bool should_add_acid_prefix) const;

static bool _check_acid_schema(const orc::Type& type);
static const orc::Type& _remove_acid(const orc::Type& type);

// functions for building search argument until _init_search_argument
std::tuple<bool, orc::Literal, orc::PredicateDataType> _make_orc_literal(
Expand Down Expand Up @@ -332,12 +335,15 @@ class OrcReader : public GenericReader {

template <bool is_filter = false>
Status _fill_doris_data_column(const std::string& col_name, MutableColumnPtr& data_column,
const DataTypePtr& data_type, const orc::Type* orc_column_type,
const DataTypePtr& data_type,
std::shared_ptr<TableSchemaChangeHelper::Node> root_node,
const orc::Type* orc_column_type,
const orc::ColumnVectorBatch* cvb, size_t num_values);

template <bool is_filter = false>
Status _orc_column_to_doris_column(const std::string& col_name, ColumnPtr& doris_column,
const DataTypePtr& data_type,
std::shared_ptr<TableSchemaChangeHelper::Node> root_node,
const orc::Type* orc_column_type,
const orc::ColumnVectorBatch* cvb, size_t num_values);

Expand Down Expand Up @@ -623,38 +629,35 @@ class OrcReader : public GenericReader {
int64_t _range_start_offset;
int64_t _range_size;
const std::string& _ctz;
const std::vector<std::string>* _column_names;
// _missing_column_names_set: used in iceberg/hudi/paimon, the columns are dropped
// but added back(drop column a then add column a). Shouldn't read this column data in this case.
std::set<std::string> _missing_column_names_set;

int32_t _offset_days = 0;
cctz::time_zone _time_zone;

std::list<std::string> _read_cols;
std::list<std::string> _read_cols_lower_case;
// The columns of the table to be read (contain columns that do not exist)
const std::vector<std::string>* _table_column_names;

// The columns of the file to be read (file column name)
std::list<std::string> _read_file_cols;

// The columns of the table to be read (table column name)
std::list<std::string> _read_table_cols;

// _read_table_cols + _missing_cols = _table_column_names
std::list<std::string> _missing_cols;

// file column name to std::vector<orc::ColumnVectorBatch*> idx.
std::unordered_map<std::string, int> _colname_to_idx;
// Column name in Orc file after removed acid(remove row.) to column name to schema.
// This is used for Hive 1.x which use internal column name in Orc file.
// _col0, _col1...
std::unordered_map<std::string, std::string> _removed_acid_file_col_name_to_schema_col;
// Flag for hive engine.
// 1. True if the external table engine is Hive1.x with orc col name as _col1, col2, ...
// 2. If true, use indexes instead of column names when reading orc tables.
bool _is_hive1_orc_or_use_idx = false;

// map col name in metastore to col name in orc file
std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
// map col name in orc file to orc type

// file column name to orc type
std::unordered_map<std::string, const orc::Type*> _type_map;
std::vector<const orc::Type*> _col_orc_type;

std::unique_ptr<ORCFileInputStream> _file_input_stream;
Statistics _statistics;
OrcProfile _orc_profile;
orc::ReaderMetrics _reader_metrics;

std::unique_ptr<orc::ColumnVectorBatch> _batch;
std::unique_ptr<orc::Reader> _reader;
std::unique_ptr<orc::Reader> _reader = nullptr;
std::unique_ptr<orc::RowReader> _row_reader;
std::unique_ptr<ORCFilterImpl> _orc_filter;
orc::RowReaderOptions _row_reader_options;
Expand Down Expand Up @@ -688,14 +691,10 @@ class OrcReader : public GenericReader {
std::shared_ptr<ObjectPool> _obj_pool;
std::unique_ptr<StringDictFilterImpl> _string_dict_filter;
bool _dict_cols_has_converted = false;
bool _has_complex_type = false;

// resolve schema change
// resolve schema type change
std::unordered_map<std::string, std::unique_ptr<converter::ColumnTypeConverter>> _converters;
//for iceberg table , when table column name != file column name
//TODO(CXY) : remove _table_col_to_file_col,because we hava _col_name_to_file_col_name,
// the two have the same effect.
std::unordered_map<std::string, std::string> _table_col_to_file_col;

//support iceberg position delete .
std::vector<int64_t>* _position_delete_ordered_rowids = nullptr;
std::unordered_map<const VSlotRef*, orc::PredicateDataType>
Expand All @@ -709,6 +708,10 @@ class OrcReader : public GenericReader {

std::pair<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>, int>
_row_id_column_iterator_pair = {nullptr, -1};

// Through this node, you can find the file column based on the table column.
std::shared_ptr<TableSchemaChangeHelper::Node> _table_info_node_ptr =
TableSchemaChangeHelper::ConstNode::get_instance();
};

class StripeStreamInputStream : public orc::InputStream, public ProfileCollector {
Expand Down
25 changes: 8 additions & 17 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_struct.h"
#include "vec/exec/format/table/table_format_reader.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"
Expand Down Expand Up @@ -137,15 +138,13 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
auto& root_schema = t_schemas[0];
_fields.resize(root_schema.num_children);
_next_schema_pos = 1;

for (int i = 0; i < root_schema.num_children; ++i) {
RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &_fields[i]));
if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) {
return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
if (_fields[i].field_id != -1) {
_field_id_name_mapping.emplace(_fields[i].field_id, _fields[i].name);
}
}

if (_next_schema_pos != t_schemas.size()) {
Expand All @@ -156,14 +155,6 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::OK();
}

const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
auto const it = _field_id_name_mapping.find(id);
if (it == _field_id_name_mapping.end()) {
return {};
}
return doris::Slice {it->second.data()};
}

Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
size_t curr_pos, FieldSchema* node_field) {
if (curr_pos >= t_schemas.size()) {
Expand All @@ -184,7 +175,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
auto child = &node_field->children[0];
parse_physical_field(t_schema, false, child);

node_field->name = to_lower(t_schema.name);
node_field->name = t_schema.name;
node_field->data_type = std::make_shared<DataTypeArray>(make_nullable(child->data_type));
_next_schema_pos = curr_pos + 1;
node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
Expand All @@ -201,7 +192,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme

void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema,
bool is_nullable, FieldSchema* physical_field) {
physical_field->name = to_lower(physical_schema.name);
physical_field->name = physical_schema.name;
physical_field->parquet_schema = physical_schema;
physical_field->physical_type = physical_schema.type;
_physical_fields.push_back(physical_field);
Expand Down Expand Up @@ -467,7 +458,7 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem
// produce a non-null list<struct>
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, struct_field));

group_field->name = to_lower(group_schema.name);
group_field->name = group_schema.name;
group_field->data_type =
std::make_shared<DataTypeArray>(make_nullable(struct_field->data_type));
group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
Expand Down Expand Up @@ -535,7 +526,7 @@ Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaEleme
_next_schema_pos = curr_pos + 2;
}

list_field->name = to_lower(first_level.name);
list_field->name = first_level.name;
list_field->data_type =
std::make_shared<DataTypeArray>(make_nullable(list_field->children[0].data_type));
if (is_optional) {
Expand Down Expand Up @@ -601,7 +592,7 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
// produce MAP<STRUCT<KEY, VALUE>>
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, map_kv_field));

map_field->name = to_lower(map_schema.name);
map_field->name = map_schema.name;
map_field->data_type = std::make_shared<DataTypeMap>(
make_nullable(assert_cast<const DataTypeStruct*>(
remove_nullable(map_kv_field->data_type).get())
Expand Down Expand Up @@ -632,7 +623,7 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
for (int i = 0; i < num_children; ++i) {
RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &struct_field->children[i]));
}
struct_field->name = to_lower(struct_schema.name);
struct_field->name = struct_schema.name;

struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
DataTypes res_data_types;
Expand Down
11 changes: 4 additions & 7 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "common/cast_set.h"
#include "common/status.h"
#include "gen_cpp/Planner_types.h"
#include "runtime/types.h"
#include "util/slice.h"
#include "vec/data_types/data_type.h"
Expand Down Expand Up @@ -60,7 +61,7 @@ struct FieldSchema {
FieldSchema(const FieldSchema& fieldSchema) = default;
std::string debug_string() const;

int32_t field_id;
int32_t field_id = -1;
};

class FieldDescriptor {
Expand All @@ -73,8 +74,8 @@ class FieldDescriptor {
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be parsed
size_t _next_schema_pos;
std::map<int32_t, std::string> _field_id_name_mapping;

private:
void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable,
FieldSchema* physical_field);

Expand Down Expand Up @@ -135,11 +136,7 @@ class FieldDescriptor {

int32_t size() const { return cast_set<int32_t>(_fields.size()); }

bool has_parquet_field_id() const { return !_field_id_name_mapping.empty(); }

std::map<int32_t, std::string> get_field_id_name_map() { return _field_id_name_mapping; }

const doris::Slice get_column_name_from_field_id(int32_t id) const;
const std::vector<FieldSchema>& get_fields_schema() const { return _fields; }
};
#include "common/compile_check_end.h"

Expand Down
Loading
Loading