From 34f3edbd54d0212c431894ac8df48fe4900f89bd Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Fri, 17 Nov 2023 18:38:55 +0800 Subject: [PATCH] [fix](iceberg) iceberg use customer method to encode special characters of field name (#27108) Fix two bugs: 1. Missing column is case sensitive, change the column name to lower case in FE for hive/iceberg/hudi 2. Iceberg use custom method to encode special characters in column name. Decode the column name to match the right column in parquet reader. --- .../vec/exec/format/parquet/schema_desc.cpp | 67 +++++++++++++++++++ be/src/vec/exec/format/parquet/schema_desc.h | 4 ++ .../format/parquet/vparquet_file_metadata.h | 3 + .../exec/format/parquet/vparquet_reader.cpp | 6 ++ .../vec/exec/format/parquet/vparquet_reader.h | 3 + .../vec/exec/format/table/iceberg_reader.cpp | 1 + be/src/vec/exec/scan/vfile_scanner.cpp | 22 +++--- be/src/vec/exec/scan/vfile_scanner.h | 6 +- .../catalog/external/HMSExternalTable.java | 6 +- .../external/IcebergExternalTable.java | 3 +- .../test_external_catalog_iceberg_common.out | 10 ++- .../test_external_catalog_iceberg_common | 3 + 12 files changed, 112 insertions(+), 22 deletions(-) diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp index 1b830689c0ed3c..c9283c628890b6 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.cpp +++ b/be/src/vec/exec/format/parquet/schema_desc.cpp @@ -26,6 +26,7 @@ #include "common/logging.h" #include "runtime/define_primitive_type.h" #include "util/slice.h" +#include "util/string_util.h" namespace doris::vectorized { @@ -239,6 +240,72 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph return type; } +// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName +static bool is_valid_avro_name(const std::string& name) { + int length = name.length(); + char first = name[0]; + if (!isalpha(first) && first != '_') { + return false; + } + + for (int i = 1; i < length; i++) { + char character = name[i]; + if (!isalpha(character) && !isdigit(character) && character != '_') { + return false; + } + } + return true; +} + +// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize +static void sanitize_avro_name(std::ostringstream& buf, char character) { + if (isdigit(character)) { + buf << '_' << character; + } else { + std::stringstream ss; + ss << std::hex << (int)character; + std::string hex_str = ss.str(); + buf << "_x" << doris::to_lower(hex_str); + } +} + +// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize +static std::string sanitize_avro_name(const std::string& name) { + std::ostringstream buf; + int length = name.length(); + char first = name[0]; + if (!isalpha(first) && first != '_') { + sanitize_avro_name(buf, first); + } else { + buf << first; + } + + for (int i = 1; i < length; i++) { + char character = name[i]; + if (!isalpha(character) && !isdigit(character) && character != '_') { + sanitize_avro_name(buf, character); + } else { + buf << character; + } + } + return buf.str(); +} + +void FieldDescriptor::iceberg_sanitize(const std::vector& read_columns) { + for (const std::string& col : read_columns) { + if (!is_valid_avro_name(col)) { + std::string sanitize_name = sanitize_avro_name(col); + auto it = _name_to_field.find(sanitize_name); + if (it != _name_to_field.end()) { + FieldSchema* schema = const_cast(it->second); + schema->name = col; + _name_to_field.emplace(col, schema); + _name_to_field.erase(sanitize_name); + } + } + } +} + TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) { TypeDescriptor type; if (logicalType.__isset.STRING) { diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index fb61ad918a7e91..d763e40e2ed5c4 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -91,6 +91,10 @@ class FieldDescriptor { TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema); public: + // org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special characters, + // we have to decode these characters + void iceberg_sanitize(const std::vector& read_columns); + FieldDescriptor() = default; ~FieldDescriptor() = default; diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h index 6f52ef5b4afb37..5d745a0db62f41 100644 --- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h +++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h @@ -32,6 +32,9 @@ class FileMetaData { Status init_schema(); const FieldDescriptor& schema() const { return _schema; } const tparquet::FileMetaData& to_thrift(); + void iceberg_sanitize(const std::vector& read_columns) { + _schema.iceberg_sanitize(read_columns); + } std::string debug_string() const; private: diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 49798ed4f13f72..124f623f2e9959 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -294,6 +294,12 @@ void ParquetReader::_init_file_description() { } } +void ParquetReader::iceberg_sanitize(const std::vector& read_columns) { + if (_file_metadata != nullptr) { + _file_metadata->iceberg_sanitize(read_columns); + } +} + Status ParquetReader::init_reader( const std::vector& all_column_names, const std::vector& missing_column_names, diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 5ceca55d7ecd24..6efd0bd7237d7c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -139,6 +139,9 @@ class ParquetReader : public GenericReader { const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; } + // Only for iceberg reader to sanitize invalid column names + void iceberg_sanitize(const std::vector& read_columns); + Status set_fill_columns( const std::unordered_map>& partition_columns, diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 9f98a0ae3f42ff..bcaa99143f05d1 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -129,6 +129,7 @@ Status IcebergTableReader::init_reader( _gen_file_col_names(); _gen_new_colname_to_value_range(); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); + parquet_reader->iceberg_sanitize(_all_required_col_names); Status status = parquet_reader->init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 7f35422ec030a1..fd77b084bb9e94 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -418,7 +418,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) { } Status VFileScanner::_fill_columns_from_path(size_t rows) { - for (auto& kv : *_partition_columns) { + for (auto& kv : _partition_col_descs) { auto doris_column = _src_block_ptr->get_by_name(kv.first).column; IColumn* col_ptr = const_cast(doris_column.get()); auto& [value, slot_desc] = kv.second; @@ -437,7 +437,7 @@ Status VFileScanner::_fill_missing_columns(size_t rows) { } SCOPED_TIMER(_fill_missing_columns_timer); - for (auto& kv : *_missing_columns) { + for (auto& kv : _missing_col_descs) { if (kv.second == nullptr) { // no default column, fill with null auto nullable_column = reinterpret_cast( @@ -862,9 +862,8 @@ Status VFileScanner::_get_next_reader() { } Status VFileScanner::_generate_fill_columns() { - _partition_columns.reset( - new std::unordered_map>()); - _missing_columns.reset(new std::unordered_map()); + _partition_col_descs.clear(); + _missing_col_descs.clear(); const TFileRangeDesc& range = _ranges.at(_next_range - 1); if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { @@ -881,8 +880,8 @@ Status VFileScanner::_generate_fill_columns() { if (size == 4 && memcmp(data, "null", 4) == 0) { data = TextConverter::NULL_STR; } - _partition_columns->emplace(slot_desc->col_name(), - std::make_tuple(data, slot_desc)); + _partition_col_descs.emplace(slot_desc->col_name(), + std::make_tuple(data, slot_desc)); } } } @@ -901,16 +900,11 @@ Status VFileScanner::_generate_fill_columns() { return Status::InternalError("failed to find default value expr for slot: {}", slot_desc->col_name()); } - _missing_columns->emplace(slot_desc->col_name(), it->second); + _missing_col_descs.emplace(slot_desc->col_name(), it->second); } } - RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, *_missing_columns)); - if (_cur_reader->fill_all_columns()) { - _partition_columns.reset(nullptr); - _missing_columns.reset(nullptr); - } - return Status::OK(); + return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs); } Status VFileScanner::_init_expr_ctxes() { diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 3611785625d338..e3533ce05c2456 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -162,9 +162,9 @@ class VFileScanner : public VScanner { std::unique_ptr _file_cache_statistics; std::unique_ptr _io_ctx; - std::unique_ptr>> - _partition_columns; - std::unique_ptr> _missing_columns; + std::unordered_map> + _partition_col_descs; + std::unordered_map _missing_col_descs; private: RuntimeProfile::Counter* _get_block_timer = nullptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 2729fdf7a95cb1..0243ad12f75770 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -435,7 +435,7 @@ public List initSchema() { } else { List tmpSchema = Lists.newArrayListWithCapacity(schema.size()); for (FieldSchema field : schema) { - tmpSchema.add(new Column(field.getName(), + tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT), HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, true, field.getComment(), true, -1)); } @@ -484,7 +484,7 @@ private List getIcebergSchema(List hmsSchema) { Schema schema = icebergTable.schema(); List tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size()); for (FieldSchema field : hmsSchema) { - tmpSchema.add(new Column(field.getName(), + tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT), HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(), IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS), true, null, true, false, null, field.getComment(), true, null, @@ -500,7 +500,7 @@ protected void initPartitionColumns(List schema) { for (String partitionKey : partitionKeys) { // Do not use "getColumn()", which will cause dead loop for (Column column : schema) { - if (partitionKey.equals(column.getName())) { + if (partitionKey.equalsIgnoreCase(column.getName())) { // For partition column, if it is string type, change it to varchar(65535) // to be same as doris managed table. // This is to avoid some unexpected behavior such as different partition pruning result diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java index bede9b99e43c47..7398ff19c9eed5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Optional; public class IcebergExternalTable extends ExternalTable { @@ -66,7 +67,7 @@ public List initSchema() { List columns = schema.columns(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); for (Types.NestedField field : columns) { - tmpSchema.add(new Column(field.name(), + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, schema.caseInsensitiveFindField(field.name()).fieldId())); } diff --git a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out index 9554f1d21f0730..a51bac0e1b541e 100644 --- a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out +++ b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out @@ -1,3 +1,11 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !q01 -- -599715 \ No newline at end of file +599715 + +-- !sanitize_mara -- +MATNR1 3.140 /DSD/SV_CNT_GRP1 +MATNR2 3.240 /DSD/SV_CNT_GRP2 +MATNR4 3.440 /DSD/SV_CNT_GRP4 +MATNR5 3.540 /DSD/SV_CNT_GRP5 +MATNR6 3.640 /DSD/SV_CNT_GRP6 + diff --git a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common index a035ea6d1b3a80..577a4e6702a7fe 100644 --- a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common +++ b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common @@ -46,5 +46,8 @@ suite("test_external_catalog_iceberg_common", "p2,external,iceberg,external_remo } sql """ use `iceberg_catalog`; """ q01_parquet() + + // test the special characters in table fields + qt_sanitize_mara """select MaTnR, NtgEW, `/dsd/Sv_cnt_grP` from sanitize_mara order by mAtNr""" } }