Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ void OrcReader::_init_profile() {
}

Status OrcReader::_create_file_reader() {
if (_reader != nullptr) {
return Status::OK();
}

if (_file_input_stream == nullptr) {
_file_description.mtime =
_scan_range.__isset.modification_time ? _scan_range.modification_time : 0;
Expand Down Expand Up @@ -283,13 +287,15 @@ Status OrcReader::_create_file_reader() {

Status OrcReader::init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const std::vector<std::string>& missing_column_names,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
const bool hive_use_column_names) {
_column_names = column_names;
_missing_column_names_set.insert(missing_column_names.begin(), missing_column_names.end());
_colname_to_value_range = colname_to_value_range;
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
Expand Down Expand Up @@ -326,14 +332,21 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
}

Status OrcReader::get_schema_col_name_attribute(std::vector<std::string>* col_names,
std::vector<uint64_t>* col_attributes,
std::string attribute) {
std::vector<int32_t>* col_attributes,
const std::string& attribute,
bool* exist_attribute) {
RETURN_IF_ERROR(_create_file_reader());
auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType();
*exist_attribute = true;
const auto& root_type = _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(get_field_name_lower_case(&root_type, i));

if (!root_type.getSubtype(i)->hasAttributeKey(attribute)) {
*exist_attribute = false;
return Status::OK();
}
col_attributes->emplace_back(
std::stol(root_type.getSubtype(i)->getAttributeValue(attribute)));
std::stoi(root_type.getSubtype(i)->getAttributeValue(attribute)));
}
return Status::OK();
}
Expand All @@ -349,8 +362,15 @@ Status OrcReader::_init_read_columns() {
// TODO, should be removed in 2.2 or later
_is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) &&
_scan_params.__isset.slot_name_to_schema_pos;

for (size_t i = 0; i < _column_names->size(); ++i) {
auto& col_name = (*_column_names)[i];

if (_missing_column_names_set.contains(col_name)) {
_missing_cols.emplace_back(col_name);
continue;
}

if (_is_hive1_orc_or_use_idx) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
Expand Down
15 changes: 10 additions & 5 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class OrcReader : public GenericReader {
//If you want to read the file by index instead of column name, set hive_use_column_names to false.
Status init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const std::vector<std::string>& missing_column_names,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down Expand Up @@ -178,8 +179,8 @@ class OrcReader : public GenericReader {
std::vector<TypeDescriptor>* col_types) override;

Status get_schema_col_name_attribute(std::vector<std::string>* col_names,
std::vector<uint64_t>* col_attributes,
std::string attribute);
std::vector<int32_t>* col_attributes,
const std::string& attribute, bool* exist_attribute);
void set_table_col_to_file_col(
std::unordered_map<std::string, std::string> table_col_to_file_col) {
_table_col_to_file_col = table_col_to_file_col;
Expand Down Expand Up @@ -577,6 +578,10 @@ class OrcReader : public GenericReader {
int64_t _range_size;
const std::string& _ctz;
const std::vector<std::string>* _column_names;
// _missing_column_names_set: used in iceberg/hudi/paimon, the columns are dropped
// but added back(drop column a then add column a). Shouldn't read this column data in this case.
std::set<std::string> _missing_column_names_set;

int32_t _offset_days = 0;
cctz::time_zone _time_zone;

Expand Down Expand Up @@ -604,7 +609,7 @@ class OrcReader : public GenericReader {
orc::ReaderMetrics _reader_metrics;

std::unique_ptr<orc::ColumnVectorBatch> _batch;
std::unique_ptr<orc::Reader> _reader;
std::unique_ptr<orc::Reader> _reader = nullptr;
std::unique_ptr<orc::RowReader> _row_reader;
std::unique_ptr<ORCFilterImpl> _orc_filter;
orc::RowReaderOptions _row_reader_options;
Expand All @@ -618,7 +623,7 @@ class OrcReader : public GenericReader {
std::vector<DecimalScaleParams> _decimal_scale_params;
size_t _decimal_scale_params_index;

std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
bool _is_acid = false;
std::unique_ptr<IColumn::Filter> _filter;
LazyReadContext _lazy_read_ctx;
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class FieldDescriptor {
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be parsed
size_t _next_schema_pos;
std::unordered_map<int, std::string> _field_id_name_mapping;
std::map<int, std::string> _field_id_name_mapping;

void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable,
FieldSchema* physical_field);
Expand Down Expand Up @@ -135,6 +135,8 @@ class FieldDescriptor {

bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; }

std::map<int32, std::string> get_field_id_name_map() { return _field_id_name_mapping; }

const doris::Slice get_column_name_from_field_id(int32_t id) const;
};

Expand Down
190 changes: 41 additions & 149 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,32 +120,9 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool*
}
RETURN_IF_ERROR(_expand_block_if_need(block));

// To support iceberg schema evolution. We change the column name in block to
// make it match with the column name in parquet file before reading data. and
// Set the name back to table column name before return this block.
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _table_col_to_file_col.find(col.name);
if (iter != _table_col_to_file_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}

RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_before(block));
RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof));
// Set the name back to table column name before return this block.
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _file_col_to_table_col.find(col.name);
if (iter != _file_col_to_table_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}
RETURN_IF_ERROR(TableSchemaChangeHelper::get_next_block_after(block));

if (_equality_delete_impl != nullptr) {
RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block));
Expand Down Expand Up @@ -228,8 +205,9 @@ Status IcebergTableReader::_equality_delete_base(
not_in_file_col_names, nullptr, {}, nullptr,
nullptr, nullptr, nullptr, nullptr, false));
} else if (auto* orc_reader = typeid_cast<OrcReader*>(delete_reader.get())) {
RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, nullptr, {}, false,
{}, {}, nullptr, nullptr));
RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names,
not_in_file_col_names, nullptr, {}, false, {},
{}, nullptr, nullptr));
} else {
return Status::InternalError("Unsupported format of delete file");
}
Expand Down Expand Up @@ -445,60 +423,6 @@ void IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& d
}
}

/*
* Generate _all_required_col_names and _not_in_file_col_names.
*
* _all_required_col_names is all the columns required by user sql.
* If the column name has been modified after the data file was written,
* put the old name in data file to _all_required_col_names.
*
* _not_in_file_col_names is all the columns required by user sql but not in the data file.
* e.g. New columns added after this data file was written.
* The columns added with names used by old dropped columns should consider as a missing column,
* which should be in _not_in_file_col_names.
*/
void IcebergTableReader::_gen_file_col_names() {
_all_required_col_names.clear();
_not_in_file_col_names.clear();
for (int i = 0; i < _file_col_names.size(); ++i) {
auto name = _file_col_names[i];
auto iter = _table_col_to_file_col.find(name);
if (iter == _table_col_to_file_col.end()) {
// If the user creates the iceberg table, directly append the parquet file that already exists,
// there is no 'iceberg.schema' field in the footer of parquet, the '_table_col_to_file_col' may be empty.
// Because we are ignoring case, so, it is converted to lowercase here
auto name_low = to_lower(name);
_all_required_col_names.emplace_back(name_low);
if (_has_iceberg_schema) {
_not_in_file_col_names.emplace_back(name);
} else {
_table_col_to_file_col.emplace(name, name_low);
_file_col_to_table_col.emplace(name_low, name);
if (name != name_low) {
_has_schema_change = true;
}
}
} else {
_all_required_col_names.emplace_back(iter->second);
}
}
}

/*
* Generate _new_colname_to_value_range, by replacing the column name in
* _colname_to_value_range with column name in data file.
*/
void IcebergTableReader::_gen_new_colname_to_value_range() {
for (auto it = _colname_to_value_range->begin(); it != _colname_to_value_range->end(); it++) {
auto iter = _table_col_to_file_col.find(it->first);
if (iter == _table_col_to_file_col.end()) {
_new_colname_to_value_range.emplace(it->first, it->second);
} else {
_new_colname_to_value_range.emplace(iter->second, it->second);
}
}
}

void IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFile* position_delete,
size_t read_rows,
bool file_path_column_dictionary_coded) {
Expand Down Expand Up @@ -544,13 +468,9 @@ Status IcebergParquetReader::init_reader(
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
_file_format = Fileformat::PARQUET;
ParquetReader* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
_col_id_name_map = col_id_name_map;
_file_col_names = file_col_names;
_colname_to_value_range = colname_to_value_range;
FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
RETURN_IF_ERROR(_gen_col_name_maps(field_desc));
_gen_file_col_names();
_gen_new_colname_to_value_range();
RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, col_id_name_map,
colname_to_value_range));

parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
Expand Down Expand Up @@ -617,27 +537,24 @@ Status IcebergOrcReader::init_reader(
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
_file_format = Fileformat::ORC;
auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
_col_id_name_map = col_id_name_map;
_file_col_names = file_col_names;
_colname_to_value_range = colname_to_value_range;

RETURN_IF_ERROR(_gen_col_name_maps(orc_reader));
_gen_file_col_names();
_gen_new_colname_to_value_range();
RETURN_IF_ERROR(TableSchemaChangeHelper::init_schema_info(file_col_names, col_id_name_map,
colname_to_value_range));
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range,
conjuncts, false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
return orc_reader->init_reader(&_all_required_col_names, _not_in_file_col_names,
&_new_colname_to_value_range, conjuncts, false, tuple_descriptor,
row_descriptor, not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts);
}

Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range,
DeleteFile* position_delete) {
OrcReader orc_delete_reader(_profile, _state, _params, *delete_range,
READ_DELETE_FILE_BATCH_SIZE, _state->timezone(), _io_ctx);
std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, &colname_to_value_range,
{}, false, {}, {}, nullptr, nullptr));
RETURN_IF_ERROR(orc_delete_reader.init_reader(&delete_file_col_names, {},
&colname_to_value_range, {}, false, {}, {},
nullptr, nullptr));

std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
Expand All @@ -658,61 +575,36 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete
return Status::OK();
}

/*
* To support schema evolution, Iceberg write the column id to column name map to
* parquet file key_value_metadata.
* This function is to compare the table schema from FE (_col_id_name_map) with
* the schema in key_value_metadata for the current parquet file and generate two maps
* for future use:
* 1. table column name to parquet column name.
* 2. parquet column name to table column name.
* For example, parquet file has a column 'col1',
* after this file was written, iceberg changed the column name to 'col1_new'.
* The two maps would contain:
* 1. col1_new -> col1
* 2. col1 -> col1_new
*/
Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor& field_desc) {
// To support schema evolution, Iceberg write the column id to column name map to parquet file key_value_metadata.
Status IcebergParquetReader::get_file_col_id_to_name(
bool& exist_schema, std::map<int32_t, std::string>& file_col_id_to_name) {
auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();

if (field_desc.has_parquet_field_id()) {
for (const auto& pair : _col_id_name_map) {
auto name_slice = field_desc.get_column_name_from_field_id(pair.first);
if (name_slice.get_size() == 0) {
_has_schema_change = true;
} else {
auto name_string = name_slice.to_string();
_table_col_to_file_col.emplace(pair.second, name_string);
_file_col_to_table_col.emplace(name_string, pair.second);
if (name_string != pair.second) {
_has_schema_change = true;
}
}
}
file_col_id_to_name = field_desc.get_field_id_name_map();
} else {
//For early iceberg version, it doesn't write any schema information to Parquet file.
exist_schema = false;
}

return Status::OK();
}

Status IcebergOrcReader::_gen_col_name_maps(OrcReader* orc_reader) {
//To support schema evolution, Iceberg write the column id to orc file attribute.
Status IcebergOrcReader::get_file_col_id_to_name(
bool& exist_schema, std::map<int32_t, std::string>& file_col_id_to_name) {
auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());

std::vector<std::string> col_names;
std::vector<uint64_t> col_ids;
RETURN_IF_ERROR(
orc_reader->get_schema_col_name_attribute(&col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE));
_has_iceberg_schema = true;
_table_col_to_file_col.clear();
_file_col_to_table_col.clear();
for (size_t i = 0; i < col_ids.size(); i++) {
auto col_id = col_ids[i];
auto& file_col_name = col_names[i];

if (_col_id_name_map.find(col_id) == _col_id_name_map.end()) {
_has_schema_change = true;
continue;
}
auto& table_col_name = _col_id_name_map[col_id];
_table_col_to_file_col.emplace(table_col_name, file_col_name);
_file_col_to_table_col.emplace(file_col_name, table_col_name);
if (table_col_name != file_col_name) {
_has_schema_change = true;
}
std::vector<int32_t> col_ids;
RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute(
&col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema));
if (!exist_schema) {
return Status::OK();
}
for (auto i = 0; i < col_names.size(); i++) {
file_col_id_to_name.emplace(col_ids[i], std::move(col_names[i]));
}
return Status::OK();
}
Expand Down
Loading
Loading