Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/logging.h"
#include "runtime/define_primitive_type.h"
#include "util/slice.h"
#include "util/string_util.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -239,6 +240,72 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
return type;
}

// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
static bool is_valid_avro_name(const std::string& name) {
int length = name.length();
char first = name[0];
if (!isalpha(first) && first != '_') {
return false;
}

for (int i = 1; i < length; i++) {
char character = name[i];
if (!isalpha(character) && !isdigit(character) && character != '_') {
return false;
}
}
return true;
}

// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
static void sanitize_avro_name(std::ostringstream& buf, char character) {
if (isdigit(character)) {
buf << '_' << character;
} else {
std::stringstream ss;
ss << std::hex << (int)character;
std::string hex_str = ss.str();
buf << "_x" << doris::to_lower(hex_str);
}
}

// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
static std::string sanitize_avro_name(const std::string& name) {
std::ostringstream buf;
int length = name.length();
char first = name[0];
if (!isalpha(first) && first != '_') {
sanitize_avro_name(buf, first);
} else {
buf << first;
}

for (int i = 1; i < length; i++) {
char character = name[i];
if (!isalpha(character) && !isdigit(character) && character != '_') {
sanitize_avro_name(buf, character);
} else {
buf << character;
}
}
return buf.str();
}

void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>& read_columns) {
for (const std::string& col : read_columns) {
if (!is_valid_avro_name(col)) {
std::string sanitize_name = sanitize_avro_name(col);
auto it = _name_to_field.find(sanitize_name);
if (it != _name_to_field.end()) {
FieldSchema* schema = const_cast<FieldSchema*>(it->second);
schema->name = col;
_name_to_field.emplace(col, schema);
_name_to_field.erase(sanitize_name);
}
}
}
}

TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) {
TypeDescriptor type;
if (logicalType.__isset.STRING) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class FieldDescriptor {
TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema);

public:
// org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special characters,
// we have to decode these characters
void iceberg_sanitize(const std::vector<std::string>& read_columns);

FieldDescriptor() = default;
~FieldDescriptor() = default;

Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_file_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class FileMetaData {
Status init_schema();
const FieldDescriptor& schema() const { return _schema; }
const tparquet::FileMetaData& to_thrift();
void iceberg_sanitize(const std::vector<std::string>& read_columns) {
_schema.iceberg_sanitize(read_columns);
}
std::string debug_string() const;

private:
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ void ParquetReader::_init_file_description() {
}
}

void ParquetReader::iceberg_sanitize(const std::vector<std::string>& read_columns) {
if (_file_metadata != nullptr) {
_file_metadata->iceberg_sanitize(read_columns);
}
}

Status ParquetReader::init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class ParquetReader : public GenericReader {

const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }

// Only for iceberg reader to sanitize invalid column names
void iceberg_sanitize(const std::vector<std::string>& read_columns);

Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Status IcebergTableReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
Status status = parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
Expand Down
22 changes: 8 additions & 14 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
}

Status VFileScanner::_fill_columns_from_path(size_t rows) {
for (auto& kv : *_partition_columns) {
for (auto& kv : _partition_col_descs) {
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
auto& [value, slot_desc] = kv.second;
Expand All @@ -437,7 +437,7 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
}

SCOPED_TIMER(_fill_missing_columns_timer);
for (auto& kv : *_missing_columns) {
for (auto& kv : _missing_col_descs) {
if (kv.second == nullptr) {
// no default column, fill with null
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
Expand Down Expand Up @@ -862,9 +862,8 @@ Status VFileScanner::_get_next_reader() {
}

Status VFileScanner::_generate_fill_columns() {
_partition_columns.reset(
new std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>());
_missing_columns.reset(new std::unordered_map<std::string, VExprContextSPtr>());
_partition_col_descs.clear();
_missing_col_descs.clear();

const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
Expand All @@ -881,8 +880,8 @@ Status VFileScanner::_generate_fill_columns() {
if (size == 4 && memcmp(data, "null", 4) == 0) {
data = TextConverter::NULL_STR;
}
_partition_columns->emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
_partition_col_descs.emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
}
}
}
Expand All @@ -901,16 +900,11 @@ Status VFileScanner::_generate_fill_columns() {
return Status::InternalError("failed to find default value expr for slot: {}",
slot_desc->col_name());
}
_missing_columns->emplace(slot_desc->col_name(), it->second);
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}

RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, *_missing_columns));
if (_cur_reader->fill_all_columns()) {
_partition_columns.reset(nullptr);
_missing_columns.reset(nullptr);
}
return Status::OK();
return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
}

Status VFileScanner::_init_expr_ctxes() {
Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ class VFileScanner : public VScanner {
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;

std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>>
_partition_columns;
std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>> _missing_columns;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_col_descs;
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;

private:
RuntimeProfile::Counter* _get_block_timer = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public List<Column> initSchema() {
} else {
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
tmpSchema.add(new Column(field.getName(),
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, field.getComment(), true, -1));
}
Expand Down Expand Up @@ -484,7 +484,7 @@ private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
Schema schema = icebergTable.schema();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
tmpSchema.add(new Column(field.getName(),
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
true, null, true, false, null, field.getComment(), true, null,
Expand All @@ -500,7 +500,7 @@ protected void initPartitionColumns(List<Column> schema) {
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
for (Column column : schema) {
if (partitionKey.equals(column.getName())) {
if (partitionKey.equalsIgnoreCase(column.getName())) {
// For partition column, if it is string type, change it to varchar(65535)
// to be same as doris managed table.
// This is to avoid some unexpected behavior such as different partition pruning result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Optional;

public class IcebergExternalTable extends ExternalTable {
Expand Down Expand Up @@ -66,7 +67,7 @@ public List<Column> initSchema() {
List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name(),
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
schema.caseInsensitiveFindField(field.name()).fieldId()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q01 --
599715
599715

-- !sanitize_mara --
MATNR1 3.140 /DSD/SV_CNT_GRP1
MATNR2 3.240 /DSD/SV_CNT_GRP2
MATNR4 3.440 /DSD/SV_CNT_GRP4
MATNR5 3.540 /DSD/SV_CNT_GRP5
MATNR6 3.640 /DSD/SV_CNT_GRP6

Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ suite("test_external_catalog_iceberg_common", "p2,external,iceberg,external_remo
}
sql """ use `iceberg_catalog`; """
q01_parquet()

// test the special characters in table fields
qt_sanitize_mara """select MaTnR, NtgEW, `/dsd/Sv_cnt_grP` from sanitize_mara order by mAtNr"""
}
}