diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 737c196736c5c3..7b770258fb2b97 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -179,4 +179,18 @@ bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* } return true; } + +void BaseScanner::fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path) { + // values of columns from path can not be null + for (int i = 0; i < columns_from_path.size(); ++i) { + auto slot_desc = _src_slot_descs.at(i + start); + _src_tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast(slot); + const std::string& column_from_path = columns_from_path[i]; + str_slot->ptr = const_cast(column_from_path.c_str()); + str_slot->len = column_from_path.size(); + } +} + } diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 8078c36c6e8f0c..884df553b0103f 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -64,6 +64,8 @@ class BaseScanner { virtual void close() = 0; bool fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool); + void fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path); + protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 19cd829f6826e6..2fc7ff42294070 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -40,7 +40,7 @@ namespace doris { BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, + const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, ScannerCounter* counter) : BaseScanner(state, profile, params, counter), @@ -73,7 +73,7 @@ Status BrokerScanner::open() { Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { SCOPED_TIMER(_read_timer); - // Get one line + // Get one line while (!_scanner_eof) { if (_cur_line_reader == nullptr || _cur_line_reader_eof) { RETURN_IF_ERROR(open_next_reader()); @@ -120,7 +120,7 @@ Status BrokerScanner::open_next_reader() { RETURN_IF_ERROR(open_file_reader()); RETURN_IF_ERROR(open_line_reader()); _next_range++; - + return Status::OK(); } @@ -379,8 +379,8 @@ bool BrokerScanner::check_decimal_input( } bool is_null(const Slice& slice) { - return slice.size == 2 && - slice.data[0] == '\\' && + return slice.size == 2 && + slice.data[0] == '\\' && slice.data[1] == 'N'; } @@ -469,7 +469,10 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { split_line(line, &values); } - if (values.size() < _src_slot_descs.size()) { + // range of current file + const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); + const std::vector& columns_from_path = range.columns_from_path; + if (values.size() + columns_from_path.size() < _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is less than schema column number. " << "actual number: " << values.size() << " sep: " << _value_separator << ", " @@ -478,7 +481,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { error_msg.str()); _counter->num_rows_filtered++; return false; - } else if (values.size() > _src_slot_descs.size()) { + } else if (values.size() + columns_from_path.size() > _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is more than schema column number. " << "actual number: " << values.size() << " sep: " << _value_separator << ", " @@ -503,6 +506,8 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { str_slot->len = value.size; } + fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path); + return true; } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index b9a798694dc8d9..9fa8c824c072c9 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -55,7 +55,7 @@ class BrokerScanner : public BaseScanner { BrokerScanner( RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, + const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, ScannerCounter* counter); @@ -123,7 +123,7 @@ private:; bool _scanner_eof; - // When we fetch range doesn't start from 0, + // When we fetch range doesn't start from 0, // we will read to one ahead, and skip the first line bool _skip_next_line; diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 795688f0ea964e..69181dd5975184 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -34,8 +34,8 @@ namespace doris { // Broker -ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader) : - _total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) { +ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, int32_t num_of_columns_from_file) : + _num_of_columns_from_file(num_of_columns_from_file), _total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) { _parquet = std::shared_ptr(new ParquetFile(file_reader)); _properties = parquet::ReaderProperties(); _properties.enable_buffered_stream(); @@ -122,16 +122,18 @@ inline void ParquetReaderWrap::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc Status ParquetReaderWrap::column_indices(const std::vector& tuple_slot_descs) { _parquet_column_ids.clear(); - for (auto slot_desc : tuple_slot_descs) { + for (int i = 0; i < _num_of_columns_from_file; i++) { + auto slot_desc = tuple_slot_descs.at(i); // Get the Column Reader for the boolean column auto iter = _map_column.find(slot_desc->col_name()); - if (iter == _map_column.end()) { + if (iter != _map_column.end()) { + _parquet_column_ids.emplace_back(iter->second); + } else { std::stringstream str_error; str_error << "Invalid Column Name:" << slot_desc->col_name(); LOG(WARNING) << str_error.str(); return Status::InvalidArgument(str_error.str()); } - _parquet_column_ids.emplace_back(iter->second); } return Status::OK(); } @@ -206,7 +208,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& const uint8_t *value = nullptr; int column_index = 0; try { - size_t slots = tuple_slot_descs.size(); + size_t slots = _parquet_column_ids.size(); for (size_t i = 0; i < slots; ++i) { auto slot_desc = tuple_slot_descs[i]; column_index = i;// column index in batch record @@ -396,6 +398,35 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } break; } + case arrow::Type::type::DATE32: { + auto ts_array = std::dynamic_pointer_cast(_batch->column(column_index)); + if (ts_array->IsNull(_current_line_of_group)) { + RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); + } else { + time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) * 24 * 60 * 60); + struct tm local; + localtime_r(×tamp, &local); + char* to = reinterpret_cast(&tmp_buf); + wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d", &local); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); + } + break; + } + case arrow::Type::type::DATE64: { + auto ts_array = std::dynamic_pointer_cast(_batch->column(column_index)); + if (ts_array->IsNull(_current_line_of_group)) { + RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); + } else { + // convert milliseconds to seconds + time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) / 1000); + struct tm local; + localtime_r(×tamp, &local); + char* to = reinterpret_cast(&tmp_buf); + wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d %H:%M:%S", &local); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); + } + break; + } default: { // other type not support. std::stringstream str_error; diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index defe3d9ebf09fd..2106c74ff937bf 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -68,7 +68,7 @@ class ParquetFile : public arrow::io::RandomAccessFile { // Reader of broker parquet file class ParquetReaderWrap { public: - ParquetReaderWrap(FileReader *file_reader); + ParquetReaderWrap(FileReader *file_reader, int32_t num_of_columns_from_file); virtual ~ParquetReaderWrap(); // Read @@ -85,6 +85,7 @@ class ParquetReaderWrap { Status handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbtyes); private: + const int32_t _num_of_columns_from_file; parquet::ReaderProperties _properties; std::shared_ptr _parquet; diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 2ce43a121956a1..b0e385560f76db 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -70,6 +70,9 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { _cur_file_eof = false; } RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + // range of current file + const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); + fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path); { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); @@ -141,7 +144,7 @@ Status ParquetScanner::open_next_reader() { file_reader->close(); continue; } - _cur_file_reader = new ParquetReaderWrap(file_reader.release()); + _cur_file_reader = new ParquetReaderWrap(file_reader.release(), range.num_of_columns_from_file); Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs); if (status.is_end_of_file()) { continue; diff --git a/be/test/exec/broker_scan_node_test.cpp b/be/test/exec/broker_scan_node_test.cpp index 5ccb5506e3c1d7..44f302bf43aed8 100644 --- a/be/test/exec/broker_scan_node_test.cpp +++ b/be/test/exec/broker_scan_node_test.cpp @@ -153,7 +153,33 @@ void BrokerScanNodeTest::init_desc_table() { slot_desc.nullIndicatorByte = 0; slot_desc.nullIndicatorBit = -1; slot_desc.colName = "k3"; - slot_desc.slotIdx = 2; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + // k4(partitioned column) + { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 1; + slot_desc.byteOffset = 12; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = -1; + slot_desc.colName = "k4"; + slot_desc.slotIdx = 4; slot_desc.isMaterialized = true; t_desc_table.slotDescriptors.push_back(slot_desc); @@ -164,7 +190,7 @@ void BrokerScanNodeTest::init_desc_table() { // TTupleDescriptor dest TTupleDescriptor t_tuple_desc; t_tuple_desc.id = 0; - t_tuple_desc.byteSize = 12; + t_tuple_desc.byteSize = 16; t_tuple_desc.numNullBytes = 0; t_tuple_desc.tableId = 0; t_tuple_desc.__isset.tableId = true; @@ -251,7 +277,34 @@ void BrokerScanNodeTest::init_desc_table() { slot_desc.nullIndicatorByte = 0; slot_desc.nullIndicatorBit = -1; slot_desc.colName = "k3"; - slot_desc.slotIdx = 2; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + // k4(partitioned column) + { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 1; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 1; + slot_desc.byteOffset = 48; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = -1; + slot_desc.colName = "k4"; + slot_desc.slotIdx = 4; slot_desc.isMaterialized = true; t_desc_table.slotDescriptors.push_back(slot_desc); @@ -261,7 +314,7 @@ void BrokerScanNodeTest::init_desc_table() { // TTupleDescriptor source TTupleDescriptor t_tuple_desc; t_tuple_desc.id = 1; - t_tuple_desc.byteSize = 48; + t_tuple_desc.byteSize = 60; t_tuple_desc.numNullBytes = 0; t_tuple_desc.tableId = 0; t_tuple_desc.__isset.tableId = true; @@ -276,7 +329,7 @@ void BrokerScanNodeTest::init_desc_table() { void BrokerScanNodeTest::init() { _params.column_separator = ','; _params.line_delimiter = '\n'; - + TTypeDesc int_type; { TTypeNode node; @@ -297,7 +350,7 @@ void BrokerScanNodeTest::init() { varchar_type.types.push_back(node); } - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < 4; ++i) { TExprNode cast_expr; cast_expr.node_type = TExprNodeType::CAST_EXPR; cast_expr.type = int_type; @@ -319,7 +372,7 @@ void BrokerScanNodeTest::init() { slot_ref.type = varchar_type; slot_ref.num_children = 0; slot_ref.__isset.slot_ref = true; - slot_ref.slot_ref.slot_id = 4 + i; + slot_ref.slot_ref.slot_id = 5 + i; slot_ref.slot_ref.tuple_id = 1; TExpr expr; @@ -327,7 +380,7 @@ void BrokerScanNodeTest::init() { expr.nodes.push_back(slot_ref); _params.expr_of_dest_slot.emplace(i + 1, expr); - _params.src_slot_ids.push_back(4 + i); + _params.src_slot_ids.push_back(5 + i); } // _params.__isset.expr_of_dest_slot = true; _params.__set_dest_tuple_id(0); @@ -367,6 +420,9 @@ TEST_F(BrokerScanNodeTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; range.splittable = true; + std::vector columns_from_path{"1"}; + range.__set_columns_from_path(columns_from_path); + range.__set_num_of_columns_from_file(3); broker_scan_range.ranges.push_back(range); scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); @@ -386,6 +442,9 @@ TEST_F(BrokerScanNodeTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; range.splittable = true; + std::vector columns_from_path{"2"}; + range.__set_columns_from_path(columns_from_path); + range.__set_num_of_columns_from_file(3); broker_scan_range.ranges.push_back(range); scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); @@ -394,7 +453,7 @@ TEST_F(BrokerScanNodeTest, normal) { } scan_node.set_scan_ranges(scan_ranges); - + status = scan_node.open(&_runtime_state); ASSERT_TRUE(status.ok()); diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp index 2d96e44cdd5b95..81ff2e1c0588bf 100644 --- a/be/test/exec/parquet_scanner_test.cpp +++ b/be/test/exec/parquet_scanner_test.cpp @@ -68,14 +68,14 @@ class ParquetSannerTest : public testing::Test { #define TUPLE_ID_DST 0 #define TUPLE_ID_SRC 1 -#define CLOMN_NUMBERS 19 +#define CLOMN_NUMBERS 20 #define DST_TUPLE_SLOT_ID_START 1 -#define SRC_TUPLE_SLOT_ID_START 20 +#define SRC_TUPLE_SLOT_ID_START 21 int ParquetSannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { const char *clomnNames[] = {"log_version", "log_time", "log_time_stamp", "js_version", "vst_cookie", "vst_ip", "vst_user_id", "vst_user_agent", "device_resolution", "page_url", "page_refer_url", "page_yyid", "page_type", "pos_type", "content_id", "media_id", - "spm_cnt", "spm_pre", "scm_cnt"}; + "spm_cnt", "spm_pre", "scm_cnt", "partition_column"}; for (int i = 0; i < CLOMN_NUMBERS; i++) { TSlotDescriptor slot_desc; @@ -201,7 +201,7 @@ int ParquetSannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next const char *clomnNames[] = {"log_version", "log_time", "log_time_stamp", "js_version", "vst_cookie", "vst_ip", "vst_user_id", "vst_user_agent", "device_resolution", "page_url", "page_refer_url", "page_yyid", "page_type", "pos_type", "content_id", "media_id", - "spm_cnt", "spm_pre", "scm_cnt"}; + "spm_cnt", "spm_pre", "scm_cnt", "partition_column"}; for (int i = 3; i < CLOMN_NUMBERS; i++, byteOffset+=16) { TSlotDescriptor slot_desc; @@ -435,6 +435,10 @@ TEST_F(ParquetSannerTest, normal) { range.size = -1; range.format_type = TFileFormatType::FORMAT_PARQUET; range.splittable = true; + + std::vector columns_from_path{"value"}; + range.__set_columns_from_path(columns_from_path); + range.__set_num_of_columns_from_file(19); #if 1 range.path = "./be/test/exec/test_data/parquet_scanner/localfile.parquet"; range.file_type = TFileType::FILE_LOCAL; diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md index 34a7c8c8ad5690..865bfc3df431b9 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md @@ -49,6 +49,7 @@ [COLUMNS TERMINATED BY "column_separator"] [FORMAT AS "file_type"] [(column_list)] + [COLUMNS FROM PATH AS (columns_from_path)] [SET (k1 = func(k2))] 说明: @@ -81,6 +82,12 @@ 当需要跳过导入文件中的某一列时,将该列指定为 table 中不存在的列名即可。 语法: (col_name1, col_name2, ...) + + columns_from_path: + + 用于指定需要从文件路径中解析的字段。 + 语法: + (col_from_path_name1, col_from_path_name2, ...) SET: @@ -278,6 +285,22 @@ (k1, k2, k3) ) WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + + 9. 提取文件路径中的压缩字段 + 如果需要,则会根据表中定义的字段类型解析文件路径中的压缩字段(partitioned fields),类似Spark中Partition Discovery的功能 + LOAD LABEL example_db.label10 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*") + INTO TABLE `my_table` + FORMAT AS "csv" + (k1, k2, k3) + COLUMNS FROM PATH AS (city, utc_date) + SET (uniq_id = md5sum(k1, city)) + ) + WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + + hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing目录下包括如下文件:[hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...] + 则提取文件路径的中的city和utc_date字段 ## keyword LOAD diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 5dbcfe99d66977..73985aa3c585d7 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -190,7 +190,7 @@ parser code {: :}; // Total keywords of doris -terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_AS, KW_ASC, KW_AUTHORS, +terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_AS, KW_ASC, KW_AUTHORS, KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BOOLEAN, KW_BOTH, KW_BROKER, KW_BACKENDS, KW_BY, KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CLUSTER, KW_CLUSTERS, KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, @@ -201,7 +201,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_FALSE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FILE, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_GLOBAL, KW_GRANT, KW_GRANTS, KW_GROUP, KW_HASH, KW_HAVING, KW_HELP,KW_HLL, KW_HLL_UNION, KW_HUB, - KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE, + KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE, KW_INNER, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_IS, KW_ISNULL, KW_ISOLATION, KW_JOIN, KW_KEY, KW_KILL, @@ -211,7 +211,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_MAX, KW_MAX_VALUE, KW_MERGE, KW_MIN, KW_MIGRATE, KW_MIGRATIONS, KW_MODIFY, KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS, KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, - KW_PARTITION, KW_PARTITIONS, KW_PRECEDING, + KW_PARTITION, KW_PARTITIONS, KW_PATH, KW_PRECEDING, KW_PASSWORD, KW_PLUGIN, KW_PLUGINS, KW_PRIMARY, KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROPERTIES, KW_PROPERTY, @@ -379,7 +379,7 @@ nonterminal LabelName job_label; nonterminal String opt_system; nonterminal String opt_cluster; nonterminal BrokerDesc opt_broker; -nonterminal List opt_col_list, col_list, opt_dup_keys; +nonterminal List opt_col_list, col_list, opt_dup_keys, opt_columns_from_path; nonterminal List opt_partitions, partitions; nonterminal List opt_col_mapping_list; nonterminal ColumnSeparator opt_field_term, column_separator; @@ -1051,9 +1051,10 @@ data_desc ::= opt_field_term:colSep opt_file_format:fileFormat opt_col_list:colList + opt_columns_from_path:columnsFromPath opt_col_mapping_list:colMappingList {: - RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, isNeg, colMappingList); + RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, columnsFromPath, isNeg, colMappingList); :} ; @@ -1110,6 +1111,13 @@ opt_file_format ::= {: RESULT = format; :} ; +opt_columns_from_path ::= + /* Empty */ + {: RESULT = null; :} + | KW_COLUMNS KW_FROM KW_PATH KW_AS LPAREN ident_list:columnsFromPath RPAREN + {: RESULT = columnsFromPath; :} + ; + opt_col_list ::= {: RESULT = null; @@ -3924,6 +3932,8 @@ keyword ::= {: RESULT = id; :} | KW_FORMAT:id {: RESULT = id; :} + | KW_PATH:id + {: RESULT = id; :} | KW_FUNCTION:id {: RESULT = id; :} | KW_END:id diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index 0623a38ea30928..c75809be124636 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -53,21 +53,22 @@ // [COLUMNS TERMINATED BY separator] // [FORMAT AS format] // [(tmp_col1, tmp_col2, col3, ...)] +// [COLUMNS FROM PATH AS (col1, ...)] // [SET (k1=f1(xx), k2=f2(xxx))] /** * The transform of columns should be added after the keyword named COLUMNS. * The transform after the keyword named SET is the old ways which only supports the hadoop function. - * It old way of transform will be removed gradually. It + * It old way of transform will be removed gradually. It */ public class DataDescription { private static final Logger LOG = LogManager.getLogger(DataDescription.class); public static String FUNCTION_HASH_HLL = "hll_hash"; private static final List hadoopSupportFunctionName = Arrays.asList("strftime", "time_format", - "alignment_timestamp", - "default_value", "md5sum", - "replace_value", "now", - "hll_hash"); + "alignment_timestamp", + "default_value", "md5sum", + "replace_value", "now", + "hll_hash"); private final String tableName; private final List partitionNames; private final List filePaths; @@ -75,6 +76,7 @@ public class DataDescription { private final List columns; private final ColumnSeparator columnSeparator; private final String fileFormat; + private final List columnsFromPath; private final boolean isNegative; private final List columnMappingList; @@ -101,12 +103,25 @@ public DataDescription(String tableName, String fileFormat, boolean isNegative, List columnMappingList) { + this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, isNegative, columnMappingList); + } + + public DataDescription(String tableName, + List partitionNames, + List filePaths, + List columns, + ColumnSeparator columnSeparator, + String fileFormat, + List columnsFromPath, + boolean isNegative, + List columnMappingList) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = filePaths; this.columns = columns; this.columnSeparator = columnSeparator; this.fileFormat = fileFormat; + this.columnsFromPath = columnsFromPath; this.isNegative = isNegative; this.columnMappingList = columnMappingList; } @@ -135,6 +150,10 @@ public String getFileFormat() { return fileFormat; } + public List getColumnsFromPath() { + return columnsFromPath; + } + public String getColumnSeparator() { if (columnSeparator == null) { return null; @@ -196,15 +215,26 @@ public boolean isHadoopLoad() { * "col2": "tmp_col2+1", "col3": "strftime("%Y-%m-%d %H:%M:%S", tmp_col3)"} */ private void analyzeColumns() throws AnalysisException { - if (columns == null || columns.isEmpty()) { + if (columns == null && columnsFromPath != null) { + throw new AnalysisException("Can not specify columns_from_path without column_list"); + } + List columnList = Lists.newArrayList(); + if (columns != null) { + columnList.addAll(columns); + if (columnsFromPath != null) { + columnList.addAll(columnsFromPath); + } + } + if (columnList.isEmpty()) { return; } // merge columns exprs from columns and columnMappingList // used to check duplicated column name Set columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + // Order of parsedColumnExprList: columns(fileFieldNames) + columnsFromPath parsedColumnExprList = Lists.newArrayList(); // Step1: analyze columns - for (String columnName : columns) { + for (String columnName : columnList) { if (!columnNames.add(columnName)) { throw new AnalysisException("Duplicate column : " + columnName); } @@ -224,17 +254,17 @@ private void analyzeColumns() throws AnalysisException { if (!(columnExpr instanceof BinaryPredicate)) { throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " - + "Expr: " + columnExpr.toSql()); + + "Expr: " + columnExpr.toSql()); } BinaryPredicate predicate = (BinaryPredicate) columnExpr; if (predicate.getOp() != Operator.EQ) { throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " - + "The mapping operator error, op: " + predicate.getOp()); + + "The mapping operator error, op: " + predicate.getOp()); } Expr child0 = predicate.getChild(0); if (!(child0 instanceof SlotRef)) { throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " - + "The mapping column error. column: " + child0.toSql()); + + "The mapping column error. column: " + child0.toSql()); } String column = ((SlotRef) child0).getColumnName(); if (!columnNames.add(column)) { @@ -244,7 +274,7 @@ private void analyzeColumns() throws AnalysisException { Expr child1 = predicate.getChild(1); if (isHadoopLoad && !(child1 instanceof FunctionCallExpr)) { throw new AnalysisException("Hadoop load only supports the designated function. " - + "The error mapping function is:" + child1.toSql()); + + "The error mapping function is:" + child1.toSql()); } ImportColumnDesc importColumnDesc = new ImportColumnDesc(column, child1); parsedColumnExprList.add(importColumnDesc); @@ -444,10 +474,10 @@ private void checkLoadPriv(String fullDbName) throws AnalysisException { // check auth if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), fullDbName, tableName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), tableName); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), tableName); } } @@ -491,6 +521,10 @@ public String apply(String s) { if (columnSeparator != null) { sb.append(" COLUMNS TERMINATED BY ").append(columnSeparator.toSql()); } + if (columnsFromPath != null && !columnsFromPath.isEmpty()) { + sb.append(" COLUMNS FROM PATH AS ("); + Joiner.on(", ").appendTo(sb, columnsFromPath).append(")"); + } if (columns != null && !columns.isEmpty()) { sb.append(" ("); Joiner.on(", ").appendTo(sb, columns).append(")"); diff --git a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java index a6cb66832c5369..bae07676e3cf45 100644 --- a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -17,6 +17,7 @@ package org.apache.doris.common.util; +import com.google.common.collect.Lists; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.FsBroker; @@ -36,6 +37,7 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.util.Collections; import java.util.List; public class BrokerUtil { @@ -99,4 +101,42 @@ public static String printBroker(String brokerName, TNetworkAddress address) { return brokerName + "[" + address.toString() + "]"; } + public static List parseColumnsFromPath(String filePath, List columnsFromPath) throws UserException { + if (columnsFromPath == null || columnsFromPath.isEmpty()) { + return Collections.emptyList(); + } + String[] strings = filePath.split("/"); + if (strings.length < 2) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + String[] columns = new String[columnsFromPath.size()]; + int size = 0; + for (int i = strings.length - 2; i >= 0; i--) { + String str = strings[i]; + if (str != null && str.isEmpty()) { + continue; + } + if (str == null || !str.contains("=")) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + String[] pair = str.split("=", 2); + if (pair.length != 2) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + int index = columnsFromPath.indexOf(pair[0]); + if (index == -1) { + continue; + } + columns[index] = pair[1]; + size++; + if (size >= columnsFromPath.size()) { + break; + } + } + if (size != columnsFromPath.size()) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + return Lists.newArrayList(columns); + } + } diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index baf25b0cbe1d17..bf69f4a76cdaed 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -61,6 +61,7 @@ public class BrokerFileGroup implements Writable { private String lineDelimiter; // fileFormat may be null, which means format will be decided by file's suffix private String fileFormat; + private List columnsFromPath; private boolean isNegative; private List partitionIds; // this is a compatible param which only happens before the function of broker has been supported. @@ -88,6 +89,7 @@ public BrokerFileGroup(BrokerTable table) throws AnalysisException { public BrokerFileGroup(DataDescription dataDescription) { this.dataDescription = dataDescription; + this.columnsFromPath = dataDescription.getColumnsFromPath(); this.exprColumnMap = null; this.columnExprList = dataDescription.getParsedColumnExprList(); } @@ -161,6 +163,10 @@ public String getFileFormat() { return fileFormat; } + public List getColumnsFromPath() { + return columnsFromPath; + } + public boolean isNegative() { return isNegative; } @@ -192,6 +198,17 @@ public String toString() { } sb.append("]"); } + if (columnsFromPath != null) { + sb.append(",columnsFromPath=["); + int idx = 0; + for (String name : columnsFromPath) { + if (idx++ != 0) { + sb.append(","); + } + sb.append(name); + } + sb.append("]"); + } if (fileFieldNames != null) { sb.append(",fileFieldNames=["); int idx = 0; @@ -333,6 +350,7 @@ public void readFields(DataInput in) throws IOException { if (fileFieldNames == null || fileFieldNames.isEmpty()) { return; } + // Order of columnExprList: fileFieldNames + columnsFromPath columnExprList = Lists.newArrayList(); for (String columnName : fileFieldNames) { columnExprList.add(new ImportColumnDesc(columnName, null)); diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 5aa11dd32a6f71..186d8324ca5560 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -651,11 +651,15 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip for (Column column : tableSchema) { nameToTableColumn.put(column.getName(), column); } - - // source columns List columnNames = Lists.newArrayList(); - List assignColumnNames = dataDescription.getColumnNames(); - if (assignColumnNames == null) { + List assignColumnNames = Lists.newArrayList(); + if (dataDescription.getColumnNames() != null) { + assignColumnNames.addAll(dataDescription.getColumnNames()); + if (dataDescription.getColumnsFromPath() != null) { + assignColumnNames.addAll(dataDescription.getColumnsFromPath()); + } + } + if (assignColumnNames.isEmpty()) { // use table columns for (Column column : tableSchema) { columnNames.add(column.getName()); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 3cda72adf7425d..d3a7c45d5be605 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -139,7 +139,7 @@ private static class ParamCreateContext { private List paramCreateContexts; public BrokerScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - List> fileStatusesList, int filesAdded) { + List> fileStatusesList, int filesAdded) { super(id, desc, planNodeName); this.fileStatusesList = fileStatusesList; this.filesAdded = filesAdded; @@ -296,6 +296,7 @@ private void initColumns(ParamCreateContext context) throws UserException { slotDesc.setIsMaterialized(true); // same as ISSUE A slotDesc.setIsNullable(true); + slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); params.addToSrc_slot_ids(slotDesc.getId().asInt()); slotDescByName.put(realColName, slotDesc); } @@ -309,7 +310,7 @@ private void initColumns(ParamCreateContext context) throws UserException { SlotDescriptor slotDesc = slotDescByName.get(slot.getColumnName()); if (slotDesc == null) { throw new UserException("unknown reference column, column=" + entry.getKey() - + ", reference=" + slot.getColumnName()); + + ", reference=" + slot.getColumnName()); } smap.getLhs().add(slot); smap.getRhs().add(new SlotRef(slotDesc)); @@ -469,7 +470,7 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An expr = NullLiteral.create(column.getType()); } else { throw new UserException("Unknown slot ref(" - + destSlotDesc.getColumn().getName() + ") in source file"); + + destSlotDesc.getColumn().getName() + ") in source file"); } } } @@ -479,12 +480,12 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { if (!(expr instanceof FunctionCallExpr)) { throw new AnalysisException("HLL column must use hll_hash function, like " - + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); + + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); } FunctionCallExpr fn = (FunctionCallExpr) expr; if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash")) { throw new AnalysisException("HLL column must use hll_hash function, like " - + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); + + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); } expr.setType(Type.HLL); } @@ -636,31 +637,35 @@ private TFileFormatType formatType(String fileFormat, String path) { // If fileFormat is not null, we use fileFormat instead of check file's suffix private void processFileGroup( - String fileFormat, - TBrokerScanRangeParams params, + ParamCreateContext context, List fileStatuses) throws UserException { if (fileStatuses == null || fileStatuses.isEmpty()) { return; } - TScanRangeLocations curLocations = newLocations(params, brokerDesc.getName()); + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc.getName()); long curInstanceBytes = 0; long curFileOffset = 0; for (int i = 0; i < fileStatuses.size(); ) { TBrokerFileStatus fileStatus = fileStatuses.get(i); long leftBytes = fileStatus.size - curFileOffset; long tmpBytes = curInstanceBytes + leftBytes; - TFileFormatType formatType = formatType(fileFormat, fileStatus.path); + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, + context.fileGroup.getColumnsFromPath()); + int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size(); if (tmpBytes > bytesPerInstance) { // Now only support split plain text if (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) { long rangeBytes = bytesPerInstance - curInstanceBytes; - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, rangeBytes); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, + rangeBytes, columnsFromPath, numberOfColumnsFromFile); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset += rangeBytes; } else { - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, + leftBytes, columnsFromPath, numberOfColumnsFromFile); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; i++; @@ -668,11 +673,12 @@ private void processFileGroup( // New one scan locationsList.add(curLocations); - curLocations = newLocations(params, brokerDesc.getName()); + curLocations = newLocations(context.params, brokerDesc.getName()); curInstanceBytes = 0; } else { - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, + leftBytes, columnsFromPath, numberOfColumnsFromFile); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; @@ -687,7 +693,8 @@ private void processFileGroup( } private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, - TFileFormatType formatType, long rangeBytes) { + TFileFormatType formatType, long rangeBytes, + List columnsFromPath, int numberOfColumnsFromFile) { TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); rangeDesc.setFile_type(TFileType.FILE_BROKER); rangeDesc.setFormat_type(formatType); @@ -696,6 +703,8 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt rangeDesc.setStart_offset(curFileOffset); rangeDesc.setSize(rangeBytes); rangeDesc.setFile_size(fileStatus.size); + rangeDesc.setNum_of_columns_from_file(numberOfColumnsFromFile); + rangeDesc.setColumns_from_path(columnsFromPath); return rangeDesc; } @@ -714,7 +723,7 @@ public void finalize(Analyzer analyzer) throws UserException { } catch (AnalysisException e) { throw new UserException(e.getMessage()); } - processFileGroup(context.fileGroup.getFileFormat(), context.params, fileStatuses); + processFileGroup(context, fileStatuses); } if (LOG.isDebugEnabled()) { for (TScanRangeLocations locations : locationsList) { @@ -766,3 +775,5 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) } } + + diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index 07d1fb61999444..fb80ff749474b2 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -155,6 +155,7 @@ import org.apache.doris.common.util.SqlUtils; keywordMap.put("following", new Integer(SqlParserSymbols.KW_FOLLOWING)); keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR)); keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT)); + keywordMap.put("path", new Integer(SqlParserSymbols.KW_PATH)); keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM)); keywordMap.put("frontend", new Integer(SqlParserSymbols.KW_FRONTEND)); keywordMap.put("frontends", new Integer(SqlParserSymbols.KW_FRONTENDS)); diff --git a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java new file mode 100644 index 00000000000000..f8f1815142f21f --- /dev/null +++ b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import com.google.common.collect.Lists; +import org.apache.doris.common.UserException; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +public class BrokerUtilTest { + + @Test + public void parseColumnsFromPath() { + String path = "/path/to/dir/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Collections.singletonList("k1")); + assertEquals(1, columns.size()); + assertEquals(Collections.singletonList("v1"), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Collections.singletonList("k1")); + fail(); + } catch (UserException ignored) { + } + + path = "/path/to/dir/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Collections.singletonList("k2")); + fail(); + } catch (UserException ignored) { + } + + path = "/path/to/dir/k1=v2/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Collections.singletonList("k1")); + assertEquals(1, columns.size()); + assertEquals(Collections.singletonList("v1"), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k2=v2/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + assertEquals(2, columns.size()); + assertEquals(Lists.newArrayList("v1", "v2"), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k2=v2/a/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + fail(); + } catch (UserException ignored) { + } + + path = "/path/to/dir/k2=v2/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2", "k3")); + fail(); + } catch (UserException ignored) { + } + + path = "/path/to/dir/k2=v2//k1=v1//xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + assertEquals(2, columns.size()); + assertEquals(Lists.newArrayList("v1", "v2"), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k2==v2=//k1=v1//xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + assertEquals(2, columns.size()); + assertEquals(Lists.newArrayList("v1", "=v2="), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k2==v2=//k1=v1/"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + fail(); + } catch (UserException ignored) { + } + + } +} diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index d587e34b58b8c9..6c0d5541e0caeb 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -114,6 +114,10 @@ struct TBrokerRangeDesc { 7: optional Types.TUniqueId load_id // total size of the file 8: optional i64 file_size + // number of columns from file + 9: optional i32 num_of_columns_from_file = 0 + // columns parsed from file path should be after the columns read from file + 10: optional list columns_from_path } struct TBrokerScanRangeParams {