From 5b9cd5b57f67435967594fb5aa52b7058fdd3d82 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Sat, 27 Jul 2024 02:21:17 +0800 Subject: [PATCH 1/4] [feature](hive)Support reading renamed Parquet Hive and Orc Hive tables. --- be/src/vec/exec/format/orc/vorc_reader.cpp | 48 +++++++--- be/src/vec/exec/format/orc/vorc_reader.h | 27 +++--- .../exec/format/parquet/vparquet_reader.cpp | 89 ++++++++++++++----- .../vec/exec/format/parquet/vparquet_reader.h | 3 +- .../vec/exec/format/table/iceberg_reader.cpp | 8 +- be/src/vec/exec/scan/vfile_scanner.cpp | 15 +++- .../datasource/hive/source/HiveScanNode.java | 16 ++++ .../org/apache/doris/qe/SessionVariable.java | 21 ++++- gensrc/thrift/PaloInternalService.thrift | 6 ++ 9 files changed, 183 insertions(+), 50 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index e2ba3a57be8c9a..466e3525a3a964 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -277,13 +277,15 @@ Status OrcReader::init_reader( const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, - const std::unordered_map* slot_id_to_filter_conjuncts) { + const std::unordered_map* slot_id_to_filter_conjuncts, + const bool hive_use_column_names) { _column_names = column_names; _colname_to_value_range = colname_to_value_range; _lazy_read_ctx.conjuncts = conjuncts; _is_acid = is_acid; _tuple_descriptor = tuple_descriptor; _row_descriptor = row_descriptor; + _is_hive1_orc_or_use_idx = !hive_use_column_names; if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) { _not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(), @@ -333,12 +335,14 @@ Status OrcReader::_init_read_columns() { bool is_hive1_orc = false; _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map, &is_hive1_orc); +// std::cout << "_scan_params.column_idxs.size() => "<< _scan_params.column_idxs.size() <<"\n"; +// std :: cout << "_scan_params.__isset.slot_name_to_schema_pos =>" << _scan_params.__isset.slot_name_to_schema_pos <<"\n"; // In old version slot_name_to_schema_pos may not be set in _scan_params // TODO, should be removed in 2.2 or later - _is_hive1_orc = is_hive1_orc && _scan_params.__isset.slot_name_to_schema_pos; + _is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) && _scan_params.__isset.slot_name_to_schema_pos; for (size_t i = 0; i < _column_names->size(); ++i) { auto& col_name = (*_column_names)[i]; - if (_is_hive1_orc) { + if (_is_hive1_orc_or_use_idx) { auto iter = _scan_params.slot_name_to_schema_pos.find(col_name); if (iter != _scan_params.slot_name_to_schema_pos.end()) { int pos = iter->second; @@ -373,10 +377,13 @@ Status OrcReader::_init_read_columns() { _read_cols_lower_case.emplace_back(col_name); // For hive engine, store the orc column name to schema column name map. // This is for Hive 1.x orc file with internal column name _col0, _col1... - if (_is_hive1_orc) { + if (_is_hive1_orc_or_use_idx) { _removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name; } - _col_name_to_file_col_name[col_name] = read_col; + + if (!_provide_column_name_mapping){ + _col_name_to_file_col_name[col_name] = read_col; + } } } return Status::OK(); @@ -706,8 +713,10 @@ bool OrcReader::_init_search_argument( if (iter == colname_to_value_range->end()) { continue; } - auto type_it = type_map.find(col_name); +// auto type_it = type_map.find(col_name); + auto type_it = type_map.find(_col_name_to_file_col_name[col_name]); if (type_it == type_map.end()) { + std::cout <<"no found\n"; continue; } std::visit( @@ -744,10 +753,10 @@ Status OrcReader::set_fill_columns( std::function visit_slot = [&](VExpr* expr) { if (VSlotRef* slot_ref = typeid_cast(expr)) { auto expr_name = slot_ref->expr_name(); - auto iter = _table_col_to_file_col.find(expr_name); - if (iter != _table_col_to_file_col.end()) { - expr_name = iter->second; - } +// auto iter = _table_col_to_file_col.find(expr_name); +// if (iter != _table_col_to_file_col.end()) { +// expr_name = iter->second; +// } predicate_columns.emplace(expr_name, std::make_pair(slot_ref->column_id(), slot_ref->slot_id())); if (slot_ref->column_id() == 0) { @@ -911,7 +920,7 @@ Status OrcReader::_init_select_types(const orc::Type& type, int idx) { std::string name; // For hive engine, translate the column name in orc file to schema column name. // This is for Hive 1.x which use internal column name _col0, _col1... - if (_is_hive1_orc) { + if (_is_hive1_orc_or_use_idx) { name = _removed_acid_file_col_name_to_schema_col[type.getFieldName(i)]; } else { name = get_field_name_lower_case(&type, i); @@ -1538,6 +1547,17 @@ std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + +// auto origin_names = block->get_names(); +// for(auto i = 0; i< block->get_names().size();i++){ +// auto& col =block->get_by_position(i); +// if (_table_col_to_file_col.contains(col.name)){ +// col.name = _table_col_to_file_col[col.name]; +// } +// } +// block->initialize_index_by_name(); + + RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof)); if (_orc_filter) { RETURN_IF_ERROR(_orc_filter->get_status()); @@ -1545,6 +1565,12 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { if (_string_dict_filter) { RETURN_IF_ERROR(_string_dict_filter->get_status()); } + +// for(auto i = 0; i< block->columns();i++){ +// block->get_by_position(i).name = origin_names[i]; +// } +// block->initialize_index_by_name(); + return Status::OK(); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 77eec261b0109e..1ce13983c1a970 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -139,14 +139,15 @@ class OrcReader : public GenericReader { const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat = true); ~OrcReader() override; - + //If you want to read the file by index instead of column name, set hive_use_column_names to false. Status init_reader( const std::vector* column_names, std::unordered_map* colname_to_value_range, const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor, const VExprContextSPtrs* not_single_slot_filter_conjuncts, - const std::unordered_map* slot_id_to_filter_conjuncts); + const std::unordered_map* slot_id_to_filter_conjuncts, + const bool hive_use_column_names = true); Status set_fill_columns( const std::unordered_map>& @@ -183,9 +184,10 @@ class OrcReader : public GenericReader { Status get_schema_col_name_attribute(std::vector* col_names, std::vector* col_attributes, std::string attribute); - void set_table_col_to_file_col( - std::unordered_map table_col_to_file_col) { - _table_col_to_file_col = table_col_to_file_col; + void set_col_name_to_file_col_name( + std::unordered_map col_name_to_file_col_name) { + _provide_column_name_mapping = true; + _col_name_to_file_col_name = col_name_to_file_col_name; } void set_position_delete_rowids(vector* delete_rows) { @@ -570,9 +572,16 @@ class OrcReader : public GenericReader { // This is used for Hive 1.x which use internal column name in Orc file. // _col0, _col1... std::unordered_map _removed_acid_file_col_name_to_schema_col; - // Flag for hive engine. True if the external table engine is Hive1.x with orc col name - // as _col1, col2, ... - bool _is_hive1_orc = false; + // Flag for hive engine. + // 1. True if the external table engine is Hive1.x with orc col name as _col1, col2, ... + // 2. If true, use indexes instead of column names when reading orc tables. + bool _is_hive1_orc_or_use_idx = false; + + // Have you provided a mapping from the table column name to the file column name, + // i.e. set_col_name_to_file_col_name() ? + // If provide_column_name_mapping is true, it means that the mapping you provided will be used. + // Iceberg reader should provide such mapping. + bool _provide_column_name_mapping = false; std::unordered_map _col_name_to_file_col_name; std::unordered_map _type_map; std::vector _col_orc_type; @@ -620,8 +629,6 @@ class OrcReader : public GenericReader { // resolve schema change std::unordered_map> _converters; - //for iceberg table , when table column name != file column name - std::unordered_map _table_col_to_file_col; //support iceberg position delete . std::vector* _position_delete_ordered_rowids = nullptr; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 25421d80b0edf2..ebcf3ba3fb20ee 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -300,12 +300,14 @@ Status ParquetReader::init_reader( const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts, - bool filter_groups) { + bool filter_groups, const bool hive_use_column_names) { _tuple_descriptor = tuple_descriptor; _row_descriptor = row_descriptor; _colname_to_slot_id = colname_to_slot_id; _not_single_slot_filter_conjuncts = not_single_slot_filter_conjuncts; _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; + _colname_to_value_range = colname_to_value_range; + _hive_use_column_names = hive_use_column_names; if (_file_metadata == nullptr) { return Status::InternalError("failed to init parquet reader, please open reader first"); } @@ -319,29 +321,57 @@ Status ParquetReader::init_reader( // missing_column_names are the columns required by user sql but not in the parquet file, // e.g. table added a column after this parquet file was written. _column_names = &all_column_names; - auto schema_desc = _file_metadata->schema(); - std::set required_columns(all_column_names.begin(), all_column_names.end()); - // Currently only used in iceberg, the columns are dropped but added back - std::set dropped_columns(missing_column_names.begin(), missing_column_names.end()); - // Make the order of read columns the same as physical order in parquet file - for (int i = 0; i < schema_desc.size(); ++i) { - auto name = schema_desc.get_column(i)->name; - // If the column in parquet file is included in all_column_names and not in missing_column_names, - // add it to _map_column, which means the reader should read the data of this column. - // Here to check against missing_column_names is for the 'Add a column back to the table - // with the same column name' case. (drop column a then add column a). - // Shouldn't read this column data in this case. - if (required_columns.find(name) != required_columns.end() && - dropped_columns.find(name) == dropped_columns.end()) { - required_columns.erase(name); - _read_columns.emplace_back(name); + + if (_hive_use_column_names) { + auto schema_desc = _file_metadata->schema(); + std::set required_columns(all_column_names.begin(), all_column_names.end()); + // Currently only used in iceberg, the columns are dropped but added back + std::set dropped_columns(missing_column_names.begin(), missing_column_names.end()); + // Make the order of read columns the same as physical order in parquet file + for (int i = 0; i < schema_desc.size(); ++i) { + auto name = schema_desc.get_column(i)->name; + // If the column in parquet file is included in all_column_names and not in missing_column_names, + // add it to _map_column, which means the reader should read the data of this column. + // Here to check against missing_column_names is for the 'Add a column back to the table + // with the same column name' case. (drop column a then add column a). + // Shouldn't read this column data in this case. + if (required_columns.find(name) != required_columns.end() && + dropped_columns.find(name) == dropped_columns.end()) { + required_columns.erase(name); + _read_columns.emplace_back(name); + } + } + for (const std::string &name: required_columns) { + _missing_cols.emplace_back(name); } } - for (const std::string& name : required_columns) { - _missing_cols.emplace_back(name); + else { + std::unordered_map new_colname_to_value_range; + const auto& column_idx = _scan_params.column_idxs; + for(int i =0 ;i < column_idx.size();i++) { + auto id = column_idx[i]; + if (id >= _file_metadata->schema().size()){ + _missing_cols.emplace_back( all_column_names[i]); + } else { + auto& table_col = all_column_names[i]; + auto file_col = _file_metadata->schema().get_column(i)->name; + _read_columns.emplace_back(file_col); + + if (table_col != file_col) { + _table_col_to_file_col[table_col] = file_col; + auto iter = _colname_to_value_range->find(table_col); + if (iter != _colname_to_value_range->end()) { + continue; + } + new_colname_to_value_range[file_col] = iter->second; + _colname_to_value_range->erase(iter->first); + } + } + } + for(auto it:new_colname_to_value_range ) { + _colname_to_value_range->emplace( it.first,std::move(it.second)); + } } - - _colname_to_value_range = colname_to_value_range; // build column predicates for column lazy read _lazy_read_ctx.conjuncts = conjuncts; RETURN_IF_ERROR(_init_row_groups(filter_groups)); @@ -525,6 +555,16 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) return Status::OK(); } + if (!_hive_use_column_names){ + for(auto i = 0; i < block->get_names().size();i++){ + auto& col = block->get_by_position(i); + if (_table_col_to_file_col.contains(col.name)){ + col.name = _table_col_to_file_col[col.name]; + } + } + block->initialize_index_by_name(); + } + SCOPED_RAW_TIMER(&_statistics.column_read_time); Status batch_st = _current_group_reader->next_batch(block, _batch_size, read_rows, &_row_group_eof); @@ -535,6 +575,13 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) *eof = true; return Status::OK(); } + + if (!_hive_use_column_names) { + for (auto i = 0; i < block->columns(); i++) { + block->get_by_position(i).name = (*_column_names)[i]; + } + block->initialize_index_by_name(); + } if (!batch_st.ok()) { return Status::InternalError("Read parquet file {} failed, reason = {}", _scan_range.path, batch_st.to_string()); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 38b3d71a466019..1dce8492ccc8d1 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -116,7 +116,7 @@ class ParquetReader : public GenericReader { const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts, - bool filter_groups = true); + bool filter_groups = true,const bool hive_use_column_names = true); Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; @@ -282,5 +282,6 @@ class ParquetReader : public GenericReader { const std::unordered_map* _colname_to_slot_id = nullptr; const VExprContextSPtrs* _not_single_slot_filter_conjuncts = nullptr; const std::unordered_map* _slot_id_to_filter_conjuncts = nullptr; + bool _hive_use_column_names = false; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index d321fc016f4d42..734ada11371f97 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -120,7 +120,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* // To support iceberg schema evolution. We change the column name in block to // make it match with the column name in parquet file before reading data. and // Set the name back to table column name before return this block. - if (_has_schema_change) { + if (_has_schema_change && _file_format == PARQUET) { for (int i = 0; i < block->columns(); i++) { ColumnWithTypeAndName& col = block->get_by_position(i); auto iter = _table_col_to_file_col.find(col.name); @@ -133,7 +133,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); // Set the name back to table column name before return this block. - if (_has_schema_change) { + if (_has_schema_change && _file_format == PARQUET) { for (int i = 0; i < block->columns(); i++) { ColumnWithTypeAndName& col = block->get_by_position(i); auto iter = _file_col_to_table_col.find(col.name); @@ -620,11 +620,11 @@ Status IcebergOrcReader::init_reader( RETURN_IF_ERROR(_gen_col_name_maps(orc_reader)); _gen_file_col_names(); _gen_new_colname_to_value_range(); - orc_reader->set_table_col_to_file_col(_table_col_to_file_col); + orc_reader->set_col_name_to_file_col_name(_table_col_to_file_col); RETURN_IF_ERROR(init_row_filters(_range)); return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts,true); } Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index e066905895e6a2..154e354cf5f96a 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -824,12 +824,18 @@ Status VFileScanner::_get_next_reader() { RETURN_IF_ERROR(paimon_reader->init_row_filters(range)); _cur_reader = std::move(paimon_reader); } else { + + bool hive_parquet_use_column_names = range.__isset.table_format_params && + range.table_format_params.table_format_type == "hive" + && _state != nullptr + && _state->query_options().hive_parquet_use_column_names; + std::vector place_holder; init_status = parquet_reader->init_reader( _file_col_names, place_holder, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts); + &_slot_id_to_filter_conjuncts,true,hive_parquet_use_column_names); _cur_reader = std::move(parquet_reader); } need_to_get_parsed_schema = true; @@ -885,10 +891,15 @@ Status VFileScanner::_get_next_reader() { RETURN_IF_ERROR(paimon_reader->init_row_filters(range)); _cur_reader = std::move(paimon_reader); } else { + bool hive_orc_use_column_names = range.__isset.table_format_params && + range.table_format_params.table_format_type == "hive" + && _state != nullptr + && _state->query_options().hive_orc_use_column_names; + hive_orc_use_column_names = false; init_status = orc_reader->init_reader( &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, _real_tuple_desc, _default_val_row_desc.get(), - &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,hive_orc_use_column_names); _cur_reader = std::move(orc_reader); } need_to_get_parsed_schema = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 1bdb805f0fd96d..fa43d93d012e8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -34,6 +34,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; +import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -51,8 +52,10 @@ import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -138,6 +141,19 @@ protected void doInitialize() throws UserException { } } + @Override + protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { + if (split instanceof HiveSplit) { + setScanParams(rangeDesc, (HiveSplit) split); + } + } + + public void setScanParams(TFileRangeDesc rangeDesc, HiveSplit hiveSplit) { + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value()); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + protected List getPartitions() throws AnalysisException { List resPartitions = Lists.newArrayList(); long start = System.currentTimeMillis(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 46fb386fc30edc..829e4f624aae76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -590,6 +590,10 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PUSHDOWN_MINMAX_ON_UNIQUE = "enable_pushdown_minmax_on_unique"; + public static final String HIVE_PARQUET_USE_COLUMN_NAMES = "hive_parquet_use_column_names"; + + public static final String HIVE_ORC_USE_COLUMN_NAMES = "hive_orc_use_column_names"; + public static final String KEEP_CARRIAGE_RETURN = "keep_carriage_return"; public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax"; @@ -1873,11 +1877,25 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { public int createTablePartitionMaxNum = 10000; + @VariableMgr.VarAttr(name = HIVE_PARQUET_USE_COLUMN_NAMES, + description = {"默认情况下按名称访问 Parquet 列。将此属性设置为“false”可按 Hive 表定义中的序号位置访问列。", + "Access Parquet columns by name by default. Set this property to `false` to access columns " + + "by their ordinal position in the Hive table definition."}) + public boolean hiveParquetUseColumnNames = true; + + + @VariableMgr.VarAttr(name = HIVE_ORC_USE_COLUMN_NAMES, + description = {"默认情况下按名称访问 Orc 列。将此属性设置为“false”可按 Hive 表定义中的序号位置访问列。", + "Access Parquet columns by name by default. Set this property to `false` to access columns " + + "by their ordinal position in the Hive table definition."}) + public boolean hiveOrcUseColumnNames = true; + @VariableMgr.VarAttr(name = KEEP_CARRIAGE_RETURN, description = {"在同时处理\r和\r\n作为CSV的行分隔符时,是否保留\r", "When processing both \\n and \\r\\n as CSV line separators, should \\r be retained?"}) public boolean keepCarriageReturn = false; + @VariableMgr.VarAttr(name = FORCE_JNI_SCANNER, description = {"强制使用jni方式读取外表", "Force the use of jni mode to read external table"}) private boolean forceJniScanner = false; @@ -3653,7 +3671,8 @@ public TQueryOptions toThrift() { tResult.setEnableMatchWithoutInvertedIndex(enableMatchWithoutInvertedIndex); tResult.setEnableFallbackOnMissingInvertedIndex(enableFallbackOnMissingInvertedIndex); - + tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames); + tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames); tResult.setKeepCarriageReturn(keepCarriageReturn); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index feb3f66a160351..7aa640f80b8e58 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -324,6 +324,12 @@ struct TQueryOptions { 121: optional bool keep_carriage_return = false; // \n,\r\n split line in CSV. 122: optional i32 runtime_bloom_filter_min_size = 1048576; + + //Access Parquet/ORC columns by name by default. Set this property to `false` to access columns + //by their ordinal position in the Hive table definition. + 123: optional bool hive_parquet_use_column_names = true; + 124: optional bool hive_orc_use_column_names = true; + // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false } From b62eebffa18d79dc2495eede982bf51f31f763d3 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Sat, 27 Jul 2024 23:36:44 +0800 Subject: [PATCH 2/4] fix error --- be/src/vec/exec/format/orc/vorc_reader.cpp | 30 +- .../exec/format/parquet/vparquet_reader.cpp | 40 +- .../vec/exec/format/parquet/vparquet_reader.h | 2 +- .../vec/exec/format/table/iceberg_reader.cpp | 3 +- be/src/vec/exec/scan/vfile_scanner.cpp | 28 +- .../create_preinstalled_scripts/run64.hql | 29 ++ .../orc_table/simulation_hive1_orc/000000_0 | Bin 0 -> 408 bytes .../test_hive_rename_column_orc/000000_0 | Bin 0 -> 405 bytes .../000000_0_copy_1 | Bin 0 -> 396 bytes .../000000_0_copy_2 | Bin 0 -> 554 bytes .../000000_0_copy_3 | Bin 0 -> 592 bytes .../test_hive_rename_column_parquet/000000_0 | Bin 0 -> 538 bytes .../000000_0_copy_1 | Bin 0 -> 543 bytes .../000000_0_copy_2 | Bin 0 -> 787 bytes .../000000_0_copy_3 | Bin 0 -> 801 bytes .../test_hive_rename_column_orc_parquet.out | 435 ++++++++++++++++++ ...test_hive_rename_column_orc_parquet.groovy | 196 ++++++++ 17 files changed, 705 insertions(+), 58 deletions(-) create mode 100644 docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run64.hql create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/simulation_hive1_orc/000000_0 create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/test_hive_rename_column_orc/000000_0 create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/test_hive_rename_column_orc/000000_0_copy_1 create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/test_hive_rename_column_orc/000000_0_copy_2 create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/test_hive_rename_column_orc/000000_0_copy_3 create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0 create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0_copy_1 create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0_copy_2 create mode 100644 docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0_copy_3 create mode 100644 regression-test/data/external_table_p0/hive/test_hive_rename_column_orc_parquet.out create mode 100644 regression-test/suites/external_table_p0/hive/test_hive_rename_column_orc_parquet.groovy diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 466e3525a3a964..cb654c330246b7 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -335,11 +335,10 @@ Status OrcReader::_init_read_columns() { bool is_hive1_orc = false; _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map, &is_hive1_orc); -// std::cout << "_scan_params.column_idxs.size() => "<< _scan_params.column_idxs.size() <<"\n"; -// std :: cout << "_scan_params.__isset.slot_name_to_schema_pos =>" << _scan_params.__isset.slot_name_to_schema_pos <<"\n"; // In old version slot_name_to_schema_pos may not be set in _scan_params // TODO, should be removed in 2.2 or later - _is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) && _scan_params.__isset.slot_name_to_schema_pos; + _is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) && + _scan_params.__isset.slot_name_to_schema_pos; for (size_t i = 0; i < _column_names->size(); ++i) { auto& col_name = (*_column_names)[i]; if (_is_hive1_orc_or_use_idx) { @@ -381,7 +380,7 @@ Status OrcReader::_init_read_columns() { _removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name; } - if (!_provide_column_name_mapping){ + if (!_provide_column_name_mapping) { _col_name_to_file_col_name[col_name] = read_col; } } @@ -713,10 +712,8 @@ bool OrcReader::_init_search_argument( if (iter == colname_to_value_range->end()) { continue; } -// auto type_it = type_map.find(col_name); auto type_it = type_map.find(_col_name_to_file_col_name[col_name]); if (type_it == type_map.end()) { - std::cout <<"no found\n"; continue; } std::visit( @@ -753,10 +750,6 @@ Status OrcReader::set_fill_columns( std::function visit_slot = [&](VExpr* expr) { if (VSlotRef* slot_ref = typeid_cast(expr)) { auto expr_name = slot_ref->expr_name(); -// auto iter = _table_col_to_file_col.find(expr_name); -// if (iter != _table_col_to_file_col.end()) { -// expr_name = iter->second; -// } predicate_columns.emplace(expr_name, std::make_pair(slot_ref->column_id(), slot_ref->slot_id())); if (slot_ref->column_id() == 0) { @@ -1547,17 +1540,6 @@ std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int } Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - -// auto origin_names = block->get_names(); -// for(auto i = 0; i< block->get_names().size();i++){ -// auto& col =block->get_by_position(i); -// if (_table_col_to_file_col.contains(col.name)){ -// col.name = _table_col_to_file_col[col.name]; -// } -// } -// block->initialize_index_by_name(); - - RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof)); if (_orc_filter) { RETURN_IF_ERROR(_orc_filter->get_status()); @@ -1565,12 +1547,6 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { if (_string_dict_filter) { RETURN_IF_ERROR(_string_dict_filter->get_status()); } - -// for(auto i = 0; i< block->columns();i++){ -// block->get_by_position(i).name = origin_names[i]; -// } -// block->initialize_index_by_name(); - return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index ebcf3ba3fb20ee..74f6001ab2a681 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -321,12 +322,12 @@ Status ParquetReader::init_reader( // missing_column_names are the columns required by user sql but not in the parquet file, // e.g. table added a column after this parquet file was written. _column_names = &all_column_names; - + auto schema_desc = _file_metadata->schema(); if (_hive_use_column_names) { - auto schema_desc = _file_metadata->schema(); std::set required_columns(all_column_names.begin(), all_column_names.end()); // Currently only used in iceberg, the columns are dropped but added back - std::set dropped_columns(missing_column_names.begin(), missing_column_names.end()); + std::set dropped_columns(missing_column_names.begin(), + missing_column_names.end()); // Make the order of read columns the same as physical order in parquet file for (int i = 0; i < schema_desc.size(); ++i) { auto name = schema_desc.get_column(i)->name; @@ -341,20 +342,23 @@ Status ParquetReader::init_reader( _read_columns.emplace_back(name); } } - for (const std::string &name: required_columns) { + for (const std::string& name : required_columns) { _missing_cols.emplace_back(name); } - } - else { + } else { std::unordered_map new_colname_to_value_range; - const auto& column_idx = _scan_params.column_idxs; - for(int i =0 ;i < column_idx.size();i++) { - auto id = column_idx[i]; - if (id >= _file_metadata->schema().size()){ - _missing_cols.emplace_back( all_column_names[i]); + const auto& table_column_idxs = _scan_params.column_idxs; + std::map table_col_id_to_idx; + for (int i = 0; i < table_column_idxs.size(); i++) { + table_col_id_to_idx.insert({table_column_idxs[i], i}); + } + + for (auto [id, idx] : table_col_id_to_idx) { + if (id >= schema_desc.size()) { + _missing_cols.emplace_back(all_column_names[idx]); } else { - auto& table_col = all_column_names[i]; - auto file_col = _file_metadata->schema().get_column(i)->name; + auto& table_col = all_column_names[idx]; + auto file_col = schema_desc.get_column(id)->name; _read_columns.emplace_back(file_col); if (table_col != file_col) { @@ -368,8 +372,8 @@ Status ParquetReader::init_reader( } } } - for(auto it:new_colname_to_value_range ) { - _colname_to_value_range->emplace( it.first,std::move(it.second)); + for (auto it : new_colname_to_value_range) { + _colname_to_value_range->emplace(it.first, std::move(it.second)); } } // build column predicates for column lazy read @@ -555,10 +559,10 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) return Status::OK(); } - if (!_hive_use_column_names){ - for(auto i = 0; i < block->get_names().size();i++){ + if (!_hive_use_column_names) { + for (auto i = 0; i < block->get_names().size(); i++) { auto& col = block->get_by_position(i); - if (_table_col_to_file_col.contains(col.name)){ + if (_table_col_to_file_col.contains(col.name)) { col.name = _table_col_to_file_col[col.name]; } } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 1dce8492ccc8d1..c79d9202c00d75 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -116,7 +116,7 @@ class ParquetReader : public GenericReader { const std::unordered_map* colname_to_slot_id, const VExprContextSPtrs* not_single_slot_filter_conjuncts, const std::unordered_map* slot_id_to_filter_conjuncts, - bool filter_groups = true,const bool hive_use_column_names = true); + bool filter_groups = true, const bool hive_use_column_names = true); Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 734ada11371f97..b72d65d3cfd5c4 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -624,7 +624,8 @@ Status IcebergOrcReader::init_reader( RETURN_IF_ERROR(init_row_filters(_range)); return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts,true); + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, + true); } Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 154e354cf5f96a..95aea26e265cb0 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -824,18 +824,21 @@ Status VFileScanner::_get_next_reader() { RETURN_IF_ERROR(paimon_reader->init_row_filters(range)); _cur_reader = std::move(paimon_reader); } else { + bool hive_parquet_use_column_names = true; - bool hive_parquet_use_column_names = range.__isset.table_format_params && - range.table_format_params.table_format_type == "hive" - && _state != nullptr - && _state->query_options().hive_parquet_use_column_names; + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "hive" && _state != nullptr) + [[likely]] { + hive_parquet_use_column_names = + _state->query_options().hive_parquet_use_column_names; + } std::vector place_holder; init_status = parquet_reader->init_reader( _file_col_names, place_holder, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id, &_not_single_slot_filter_conjuncts, - &_slot_id_to_filter_conjuncts,true,hive_parquet_use_column_names); + &_slot_id_to_filter_conjuncts, true, hive_parquet_use_column_names); _cur_reader = std::move(parquet_reader); } need_to_get_parsed_schema = true; @@ -891,15 +894,18 @@ Status VFileScanner::_get_next_reader() { RETURN_IF_ERROR(paimon_reader->init_row_filters(range)); _cur_reader = std::move(paimon_reader); } else { - bool hive_orc_use_column_names = range.__isset.table_format_params && - range.table_format_params.table_format_type == "hive" - && _state != nullptr - && _state->query_options().hive_orc_use_column_names; - hive_orc_use_column_names = false; + bool hive_orc_use_column_names = true; + + if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "hive" && _state != nullptr) + [[likely]] { + hive_orc_use_column_names = _state->query_options().hive_orc_use_column_names; + } init_status = orc_reader->init_reader( &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, _real_tuple_desc, _default_val_row_desc.get(), - &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,hive_orc_use_column_names); + &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts, + hive_orc_use_column_names); _cur_reader = std::move(orc_reader); } need_to_get_parsed_schema = true; diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run64.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run64.hql new file mode 100644 index 00000000000000..744b83418db0d0 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run64.hql @@ -0,0 +1,29 @@ +use default; + +create table simulation_hive1_orc( + `a` boolean, + `b` int, + `c` string +)stored as orc +LOCATION '/user/doris/preinstalled_data/orc_table/simulation_hive1_orc'; +msck repair table simulation_hive1_orc; + +create table test_hive_rename_column_parquet( + `new_a` boolean, + `new_b` int, + `c` string, + `new_d` int, + `f` string +)stored as parquet +LOCATION '/user/doris/preinstalled_data/parquet_table/test_hive_rename_column_parquet'; +msck repair table test_hive_rename_column_parquet; + +create table test_hive_rename_column_orc( + `new_a` boolean, + `new_b` int, + `c` string, + `new_d` int, + `f` string +)stored as orc +LOCATION '/user/doris/preinstalled_data/orc_table/test_hive_rename_column_orc'; +msck repair table test_hive_rename_column_orc; diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/simulation_hive1_orc/000000_0 b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/simulation_hive1_orc/000000_0 new file mode 100644 index 0000000000000000000000000000000000000000..848dc3250eeb405096cbdfd87f76a97763e3c7da GIT binary patch literal 408 zcmV;J0cZYCQbQ2{015^Y1PB6904D$d3K0qf000005(x+bDgz1u0Z;%d000UT3IhND z01^xc0ulxY6c8vFJWv2i0084s;bLU~0U>D)CM8iW?u^u&oP34y{Gyx`A(rga)B*{) z00saB0094h2><|20Td`7003xZY;12JcW-iRWNT$*a0dVYMgg>F001q>!3h8$3F zU=;mG9&#nve}yK&$1<=}hwj2P!^)F_Ooi92{#y$6RA??-HH`8@^ZIfYK5G@BjegU>4ZVD8 z2Lli@12MY+lNAdGQvib%HwTjzGZ!Oc07C?;6(0wa5F3YxfQEz(kk4)<%fX~1%Eg_L znv;{SP@Z3ulOn{Dotj!8AqSL}aG2od!2kdVdk_H_fPfGp0ssssMlc70^aBG=QbQMw CH;_31 literal 0 HcmV?d00001 diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/test_hive_rename_column_orc/000000_0 b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/test_hive_rename_column_orc/000000_0 new file mode 100644 index 0000000000000000000000000000000000000000..398aed3001fb0d227fabc108447f414145a261e1 GIT binary patch literal 405 zcmV;G0c!qFQbQ2{015^Y1PB6904D$d3K0qf000005(x+bDgz1u0Z;%d000UT3IhND z01^xc0ulxY6c8vFJWv2i0084s;bLU~0U>D)CM8iW?u^u&oP34y{Gyx`A(rga)B*{) z00saB0094h2><|20Td`7003xZY;12JcW-iRWNT$*a0dVYMgg>F001q>!3h8$3F zU=;mG9&#nve}yK&$1<=}hwj2P!^)F_Ooi92{#y$6RA??-HH`8@^ZIfYK5G>Hq-aU>4ZVD8BgD+e#4N>_D8-m0#h9$b#K8c> z%s|X;z+}b3!4$w?#m&K_#mvRX7{Cz0YQ@LFB*excBA_8*1LU(?$#O6$iE?pgq~_%0 zE0pIK<)jF)WT&PUNXP-DB^)OBc`yI~2zU?y7=VBfA_4#mC`K>`g7gCePf|k{24704>Nt4ge4gL{Uam!-81I zIDq?KQ8gAnFZoqeMqsPY$mN3DO+03_pIxxHiQR-L6B>SaU4Q@p<8t6);b012;Ns?B z(qiUfWDH=4VCCZHU=m{Iun`E5=m80^b4hV9DG710Bqk-N2yx`4mMbKtB&SG70Odph z=l}rYU>4ZLD8(aq$I(?BgD+e#4N>{ms%d52%(as7?YKlI2eGK8Hm{p zn56~Ny#Zf96)mv q5>t{>BqV@xq7D=MJQx57cMt&>fPfGp0ssssLof$|^aBG=QbQLJ7=sc3 literal 0 HcmV?d00001 diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/test_hive_rename_column_orc/000000_0_copy_2 b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/orc_table/test_hive_rename_column_orc/000000_0_copy_2 new file mode 100644 index 0000000000000000000000000000000000000000..84490d9f085582f3811c8cb8dd154b825845b301 GIT binary patch literal 554 zcmeYdau#G@;9?VE;b012PzN#vxmXy0K!}rrNsF0_kuiWl7bq&i#S9YU&f_+35IDgo zaR6wK6-eWAutr0mMn=79PyWnFQrzZcFhPJbfq{XQf#H7xCj)~YV@d@t14BhcMP+3r zI|G9g;|H*4#R&lh2Jds{HP^8)GXTZi7~Q%U7<|td@--OnFdw*<>!it@=l@ie z)y&PMMIt+^Kid|*o_Q;cGB|;-GW0r)Oicd&ks?oikfQyG-@B+Smy?sd^f!c^fYCFkGf-SZKgt zXe_Y#a0auQwiw?s1~%jKyg7^#eanIRt$_L`Jp}7d1nQ5|n|5T$6A{H{SGJUJ?qgtMU}a$V-@wVh;K$gr zK%9XgBM}6Xa}%>u*%=tz7*lx}7~&XR3;g8x85k0il2cOC(!mnWJq!%Krw(#881OJ( zSR4I%jl!cumKMW5|NFPF@yi`jR?Iuw*K#)A*h(~J%Y$X2&01+Q9xo5saG}X|%7q)+ zVXF$>aak{2s{iRc^Ch4QPd!e0n820r;L)>$OG(d?Qg{+N5}pb@Nf3JU=XiItJ~5hk_0UH+y=vHoJFBo!B|E zHMGlAf2)n1pp&Ynp_+4qW>8P2myZmO!x^3ps5)%WD QMveh{(-&rD{~%`x07}Bh1ONa4 literal 0 HcmV?d00001 diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0 b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0 new file mode 100644 index 0000000000000000000000000000000000000000..deea62bcfb37bcaa2e195fd010ac80324349fe99 GIT binary patch literal 538 zcmZva%SyvQ6o${Fp#)ot3nxrq7TFjoXq+^?S+eO)#6{3ecQQ#w6HG6rH}DbqaK%UR zq-qUC=5o%=`M;U@VZz~~M}anIGX3fF#QB) zRF_r1ojbgiBjaiS)Y zR`K8qjS&e{D`Et+Uc~qUU!+^y3LJw|usQ}*EtFB*HA6FWjNr(mg)(e~Q)k5SrXM8Q zBWrW*i~?42c;vT??+B;$Uq0URTVigBvm?R(zDuhk?&mUpU#aEwB)8WpU!=3C?YW+Ch24x+X*RZn@5yer(-z`Z`BA?m jT0(d&>Gj)wJMQ#b(YV)+d%nE10uJ!q;e`N%_$$5u60cK~ literal 0 HcmV?d00001 diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0_copy_1 b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0_copy_1 new file mode 100644 index 0000000000000000000000000000000000000000..45ae5dee1abbaaf19365ae49eb9534e75e82087c GIT binary patch literal 543 zcmZuv&uhXk82wT^Da^?(DJg`}i=~6jrM3PT^x|!d9m;m`CTSaHVb)o-!Vdm*JNq~7 z)2S;m^6Mq-L5ir5suK=LosMQG4obrlJcadQF4bG^p z>`tB?wG*K=puwND2dsYd- zu?tj;B~a~%W1tPhBx&KBbc6Y38YJ2KH|ECKlJKAW6{GpBnL1dUU9XF5sfzY8o2vHLELZGm zxX9$g+eGH?6PW>f{*=XhrIz>8!Y);@$Y#0iwLQ@mwi~aqkJJ_&PxgA4ekZ=;tK+QS0 z5DkAqa3+AazrOdZXSs869a5nCbpMz-n3QOlGeQ^v7lLI301I#1COGDl8+%S02h54^ ziQ7h1VDv~YAvB=D8rp=wUMv>Op3mpSvW*$mI^gWDT8aaMluFoi27<$K&3eU*;6NMyU+mih$z8?YJGk-2P>=hpTvMul-w&?_t2=C2Z!-{M+`o z1OK{iQhX3jcUK%FS>n5H*Z&~x4ew3JyS0`jX(9P;q2}7{U*4MpZ?>V?EePIht-1LH z44C`sxDI%n9X#jRMCH{-pFZd(? I9sC`>01v)%dH?_b literal 0 HcmV?d00001 diff --git a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0_copy_3 b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/parquet_table/test_hive_rename_column_parquet/000000_0_copy_3 new file mode 100644 index 0000000000000000000000000000000000000000..97bb0ab8475563bd29935ed9628e2dba64957985 GIT binary patch literal 801 zcmZWo$xg#C5OtPT6-(h_%aL-ZkO-xMlB6qAFI*7f6v2f9QspFW6X*izCiQFh3{E}w z#5XYxB{fA(;_?75fh8%f+2h586ywT50Zl@#s2#qoeAI1lONyhDN_-U zNCUNkD}Q`7-s@AH`9YXf>kx^A20NfwH3oP4Q9S3}Nf7ZN!+Rifs?ywuFS-n2^*8nr zK7d#WPQfb6hMQ~;vP3sbM7X_9#_ z6?h>p)=CrR(jX)b`Ot|DBJ_BX#8aMBr}2 Date: Sun, 28 Jul 2024 18:21:46 +0800 Subject: [PATCH 3/4] fix iceberg error and hive transactional --- be/src/vec/exec/format/orc/vorc_reader.cpp | 19 +++++++++++++++---- be/src/vec/exec/format/orc/vorc_reader.h | 4 ++-- .../vec/exec/format/table/iceberg_reader.cpp | 7 +++---- .../doris/datasource/FileQueryScanNode.java | 4 ++++ .../datasource/hive/source/HiveScanNode.java | 16 ---------------- 5 files changed, 24 insertions(+), 26 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index cb654c330246b7..d49b21c62ace02 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -321,7 +321,7 @@ Status OrcReader::get_schema_col_name_attribute(std::vector* col_na RETURN_IF_ERROR(_create_file_reader()); auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { - col_names->emplace_back(get_field_name_lower_case(&root_type, i)); + col_names->emplace_back(root_type.getFieldName(i)); col_attributes->emplace_back( std::stol(root_type.getSubtype(i)->getAttributeValue(attribute))); } @@ -335,6 +335,19 @@ Status OrcReader::_init_read_columns() { bool is_hive1_orc = false; _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map, &is_hive1_orc); + if (_is_iceberg_provide_column_name_mapping) { + for (auto col_name : *_column_names) { + auto it = _col_name_to_file_col_name.find(col_name); + if (it != _col_name_to_file_col_name.end()) { + _read_cols.emplace_back(it->second); + _read_cols_lower_case.emplace_back(col_name); + } else { + _missing_cols.emplace_back(col_name); + } + } + return Status::OK(); + } + // In old version slot_name_to_schema_pos may not be set in _scan_params // TODO, should be removed in 2.2 or later _is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) && @@ -380,9 +393,7 @@ Status OrcReader::_init_read_columns() { _removed_acid_file_col_name_to_schema_col[orc_cols[pos]] = col_name; } - if (!_provide_column_name_mapping) { - _col_name_to_file_col_name[col_name] = read_col; - } + _col_name_to_file_col_name[col_name] = read_col; } } return Status::OK(); diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 1ce13983c1a970..591ba9935d7271 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -186,7 +186,7 @@ class OrcReader : public GenericReader { std::string attribute); void set_col_name_to_file_col_name( std::unordered_map col_name_to_file_col_name) { - _provide_column_name_mapping = true; + _is_iceberg_provide_column_name_mapping = true; _col_name_to_file_col_name = col_name_to_file_col_name; } @@ -581,7 +581,7 @@ class OrcReader : public GenericReader { // i.e. set_col_name_to_file_col_name() ? // If provide_column_name_mapping is true, it means that the mapping you provided will be used. // Iceberg reader should provide such mapping. - bool _provide_column_name_mapping = false; + bool _is_iceberg_provide_column_name_mapping = false; std::unordered_map _col_name_to_file_col_name; std::unordered_map _type_map; std::vector _col_orc_type; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index b72d65d3cfd5c4..a8ac0cc2b0fac6 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -622,10 +622,9 @@ Status IcebergOrcReader::init_reader( _gen_new_colname_to_value_range(); orc_reader->set_col_name_to_file_col_name(_table_col_to_file_col); RETURN_IF_ERROR(init_row_filters(_range)); - return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, - conjuncts, false, tuple_descriptor, row_descriptor, - not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, - true); + return orc_reader->init_reader( + &_file_col_names, &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, + row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, true); } Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index a5c5f501a1ba0b..d41d8e8c3a995c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -422,6 +422,10 @@ private TScanRangeLocations splitToScanRange( transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs); tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); + } else if (fileSplit instanceof HiveSplit) { + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value()); + rangeDesc.setTableFormatParams(tableFormatFileDesc); } setScanParams(rangeDesc, fileSplit); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index fa43d93d012e8d..1bdb805f0fd96d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -34,7 +34,6 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplit; -import org.apache.doris.datasource.TableFormatType; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -52,10 +51,8 @@ import org.apache.doris.thrift.TFileAttributes; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TFileRangeDesc; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -141,19 +138,6 @@ protected void doInitialize() throws UserException { } } - @Override - protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { - if (split instanceof HiveSplit) { - setScanParams(rangeDesc, (HiveSplit) split); - } - } - - public void setScanParams(TFileRangeDesc rangeDesc, HiveSplit hiveSplit) { - TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); - tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value()); - rangeDesc.setTableFormatParams(tableFormatFileDesc); - } - protected List getPartitions() throws AnalysisException { List resPartitions = Lists.newArrayList(); long start = System.currentTimeMillis(); From 5cfcedd8f8c84899c8483d037dce8fddc55171e0 Mon Sep 17 00:00:00 2001 From: daidai <2017501503@qq.com> Date: Mon, 29 Jul 2024 15:24:54 +0800 Subject: [PATCH 4/4] remove iceberg --- be/src/vec/exec/format/orc/vorc_reader.cpp | 19 +++++-------------- be/src/vec/exec/format/orc/vorc_reader.h | 16 +++++++--------- .../vec/exec/format/table/iceberg_reader.cpp | 12 ++++++------ 3 files changed, 18 insertions(+), 29 deletions(-) diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index d49b21c62ace02..2e8328dc2ad379 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -321,7 +321,7 @@ Status OrcReader::get_schema_col_name_attribute(std::vector* col_na RETURN_IF_ERROR(_create_file_reader()); auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType(); for (int i = 0; i < root_type.getSubtypeCount(); ++i) { - col_names->emplace_back(root_type.getFieldName(i)); + col_names->emplace_back(get_field_name_lower_case(&root_type, i)); col_attributes->emplace_back( std::stol(root_type.getSubtype(i)->getAttributeValue(attribute))); } @@ -335,19 +335,6 @@ Status OrcReader::_init_read_columns() { bool is_hive1_orc = false; _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map, &is_hive1_orc); - if (_is_iceberg_provide_column_name_mapping) { - for (auto col_name : *_column_names) { - auto it = _col_name_to_file_col_name.find(col_name); - if (it != _col_name_to_file_col_name.end()) { - _read_cols.emplace_back(it->second); - _read_cols_lower_case.emplace_back(col_name); - } else { - _missing_cols.emplace_back(col_name); - } - } - return Status::OK(); - } - // In old version slot_name_to_schema_pos may not be set in _scan_params // TODO, should be removed in 2.2 or later _is_hive1_orc_or_use_idx = (is_hive1_orc || _is_hive1_orc_or_use_idx) && @@ -761,6 +748,10 @@ Status OrcReader::set_fill_columns( std::function visit_slot = [&](VExpr* expr) { if (VSlotRef* slot_ref = typeid_cast(expr)) { auto expr_name = slot_ref->expr_name(); + auto iter = _table_col_to_file_col.find(expr_name); + if (iter != _table_col_to_file_col.end()) { + expr_name = iter->second; + } predicate_columns.emplace(expr_name, std::make_pair(slot_ref->column_id(), slot_ref->slot_id())); if (slot_ref->column_id() == 0) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 591ba9935d7271..c0b372dfcea5ee 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -184,10 +184,9 @@ class OrcReader : public GenericReader { Status get_schema_col_name_attribute(std::vector* col_names, std::vector* col_attributes, std::string attribute); - void set_col_name_to_file_col_name( - std::unordered_map col_name_to_file_col_name) { - _is_iceberg_provide_column_name_mapping = true; - _col_name_to_file_col_name = col_name_to_file_col_name; + void set_table_col_to_file_col( + std::unordered_map table_col_to_file_col) { + _table_col_to_file_col = table_col_to_file_col; } void set_position_delete_rowids(vector* delete_rows) { @@ -577,11 +576,6 @@ class OrcReader : public GenericReader { // 2. If true, use indexes instead of column names when reading orc tables. bool _is_hive1_orc_or_use_idx = false; - // Have you provided a mapping from the table column name to the file column name, - // i.e. set_col_name_to_file_col_name() ? - // If provide_column_name_mapping is true, it means that the mapping you provided will be used. - // Iceberg reader should provide such mapping. - bool _is_iceberg_provide_column_name_mapping = false; std::unordered_map _col_name_to_file_col_name; std::unordered_map _type_map; std::vector _col_orc_type; @@ -629,6 +623,10 @@ class OrcReader : public GenericReader { // resolve schema change std::unordered_map> _converters; + //for iceberg table , when table column name != file column name + //TODO(CXY) : remove _table_col_to_file_col,because we hava _col_name_to_file_col_name, + // the two have the same effect. + std::unordered_map _table_col_to_file_col; //support iceberg position delete . std::vector* _position_delete_ordered_rowids = nullptr; }; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index a8ac0cc2b0fac6..d321fc016f4d42 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -120,7 +120,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* // To support iceberg schema evolution. We change the column name in block to // make it match with the column name in parquet file before reading data. and // Set the name back to table column name before return this block. - if (_has_schema_change && _file_format == PARQUET) { + if (_has_schema_change) { for (int i = 0; i < block->columns(); i++) { ColumnWithTypeAndName& col = block->get_by_position(i); auto iter = _table_col_to_file_col.find(col.name); @@ -133,7 +133,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows, eof)); // Set the name back to table column name before return this block. - if (_has_schema_change && _file_format == PARQUET) { + if (_has_schema_change) { for (int i = 0; i < block->columns(); i++) { ColumnWithTypeAndName& col = block->get_by_position(i); auto iter = _file_col_to_table_col.find(col.name); @@ -620,11 +620,11 @@ Status IcebergOrcReader::init_reader( RETURN_IF_ERROR(_gen_col_name_maps(orc_reader)); _gen_file_col_names(); _gen_new_colname_to_value_range(); - orc_reader->set_col_name_to_file_col_name(_table_col_to_file_col); + orc_reader->set_table_col_to_file_col(_table_col_to_file_col); RETURN_IF_ERROR(init_row_filters(_range)); - return orc_reader->init_reader( - &_file_col_names, &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, - row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts, true); + return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, + conjuncts, false, tuple_descriptor, row_descriptor, + not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); } Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete_range,