From 2e2b46656fc97f6c3f2ac0e0ac288ebaf4e60935 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Tue, 13 Aug 2019 00:09:59 +0800 Subject: [PATCH 01/14] Enable parsing columns from file path for Broker Load --- be/src/exec/broker_scanner.cpp | 39 +++++++--- be/src/exec/broker_scanner.h | 2 + be/src/exec/parquet_reader.cpp | 63 +++++++++++++--- be/src/exec/parquet_reader.h | 3 +- be/src/exec/parquet_scanner.cpp | 2 +- be/test/exec/broker_scan_node_test.cpp | 75 ++++++++++++++++--- be/test/exec/parquet_scanner_test.cpp | 11 ++- .../sql-statements/Data Manipulation/LOAD.md | 23 ++++++ fe/src/main/cup/sql_parser.cup | 16 +++- .../doris/analysis/DataDescription.java | 36 ++++++++- .../org/apache/doris/common/FeConstants.java | 2 +- .../apache/doris/common/FeMetaVersion.java | 2 + .../apache/doris/load/BrokerFileGroup.java | 41 ++++++++++ .../main/java/org/apache/doris/load/Load.java | 12 ++- .../apache/doris/planner/BrokerScanNode.java | 50 +++++++++++-- fe/src/main/jflex/sql_scanner.flex | 1 + gensrc/thrift/PlanNodes.thrift | 2 + 17 files changed, 325 insertions(+), 55 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 19cd829f6826e6..e5d67b1bd34a05 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -54,6 +54,7 @@ BrokerScanner::BrokerScanner(RuntimeState* state, _cur_decompressor(nullptr), _next_range(0), _cur_line_reader_eof(false), + _columns_from_path(), _scanner_eof(false), _skip_next_line(false) { } @@ -236,6 +237,9 @@ Status BrokerScanner::open_line_reader() { // _decompressor may be NULL if this is not a compressed file RETURN_IF_ERROR(create_decompressor(range.format_type)); + // set columns parsed from this file path + _columns_from_path = range.columns_from_path; + // open line reader switch (range.format_type) { case TFileFormatType::FORMAT_CSV_PLAIN: @@ -452,6 +456,17 @@ bool BrokerScanner::convert_one_row( return fill_dest_tuple(line, tuple, tuple_pool); } +inline void BrokerScanner::fill_slot(SlotDescriptor* slot_desc, const Slice& value) { + if (slot_desc->is_nullable() && is_null(value)) { + _src_tuple->set_null(slot_desc->null_indicator_offset()); + } + _src_tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast(slot); + str_slot->ptr = value.data; + str_slot->len = value.size; +} + // Convert one row to this tuple bool BrokerScanner::line_to_src_tuple(const Slice& line) { @@ -469,7 +484,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { split_line(line, &values); } - if (values.size() < _src_slot_descs.size()) { + if (values.size() + _columns_from_path.size() < _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is less than schema column number. " << "actual number: " << values.size() << " sep: " << _value_separator << ", " @@ -478,7 +493,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { error_msg.str()); _counter->num_rows_filtered++; return false; - } else if (values.size() > _src_slot_descs.size()) { + } else if (values.size() + _columns_from_path.size() > _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is more than schema column number. " << "actual number: " << values.size() << " sep: " << _value_separator << ", " @@ -489,18 +504,18 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { return false; } - for (int i = 0; i < values.size(); ++i) { + int file_column_index = 0; + for (int i = 0; i < _src_slot_descs.size(); ++i) { auto slot_desc = _src_slot_descs[i]; - const Slice& value = values[i]; - if (slot_desc->is_nullable() && is_null(value)) { - _src_tuple->set_null(slot_desc->null_indicator_offset()); - continue; + auto iter = _columns_from_path.find(slot_desc->col_name()); + if (iter != _columns_from_path.end()) { + std::string partitioned_field = iter->second; + const Slice value = Slice(partitioned_field.c_str(), partitioned_field.size()); + fill_slot(slot_desc, value); + } else { + const Slice& value = values[file_column_index++]; + fill_slot(slot_desc, value); } - _src_tuple->set_not_null(slot_desc->null_indicator_offset()); - void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); - StringValue* str_slot = reinterpret_cast(slot); - str_slot->ptr = value.data; - str_slot->len = value.size; } return true; diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index b9a798694dc8d9..e21db9263554eb 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -104,6 +104,7 @@ class BrokerScanner : public BaseScanner { //Status init_expr_ctxes(); Status line_to_src_tuple(); + void fill_slot(SlotDescriptor* slot_desc, const Slice& value); bool line_to_src_tuple(const Slice& line); private:; const std::vector& _ranges; @@ -120,6 +121,7 @@ private:; Decompressor* _cur_decompressor; int _next_range; bool _cur_line_reader_eof; + std::map _columns_from_path; bool _scanner_eof; diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 795688f0ea964e..1ae97530928899 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -34,8 +34,8 @@ namespace doris { // Broker -ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader) : - _total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) { +ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, const std::map& columns_from_path) : + _columns_from_path(columns_from_path), _total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) { _parquet = std::shared_ptr(new ParquetFile(file_reader)); _properties = parquet::ReaderProperties(); _properties.enable_buffered_stream(); @@ -125,13 +125,17 @@ Status ParquetReaderWrap::column_indices(const std::vector& tup for (auto slot_desc : tuple_slot_descs) { // Get the Column Reader for the boolean column auto iter = _map_column.find(slot_desc->col_name()); - if (iter == _map_column.end()) { - std::stringstream str_error; - str_error << "Invalid Column Name:" << slot_desc->col_name(); - LOG(WARNING) << str_error.str(); - return Status::InvalidArgument(str_error.str()); + if (iter != _map_column.end()) { + _parquet_column_ids.emplace_back(iter->second); + } else { + auto iter_1 = _columns_from_path.find(slot_desc->col_name()); + if (iter_1 == _columns_from_path.end()) { + std::stringstream str_error; + str_error << "Invalid Column Name:" << slot_desc->col_name(); + LOG(WARNING) << str_error.str(); + return Status::InvalidArgument(str_error.str()); + } } - _parquet_column_ids.emplace_back(iter->second); } return Status::OK(); } @@ -204,13 +208,23 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& uint8_t tmp_buf[128] = {0}; int32_t wbytes = 0; const uint8_t *value = nullptr; + int index = 0; int column_index = 0; try { size_t slots = tuple_slot_descs.size(); for (size_t i = 0; i < slots; ++i) { auto slot_desc = tuple_slot_descs[i]; - column_index = i;// column index in batch record - switch (_parquet_column_type[i]) { + auto iter = _columns_from_path.find(slot_desc->col_name()); + if (iter != _columns_from_path.end()) { + std::string partitioned_field = iter->second; + value = reinterpret_cast(partitioned_field.c_str()); + wbytes = partitioned_field.size(); + fill_slot(tuple, slot_desc, mem_pool, value, wbytes); + continue; + } else { + column_index = index++; // column index in batch record + } + switch (_parquet_column_type[column_index]) { case arrow::Type::type::STRING: { auto str_array = std::dynamic_pointer_cast(_batch->column(column_index)); if (str_array->IsNull(_current_line_of_group)) { @@ -396,6 +410,35 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } break; } + case arrow::Type::type::DATE32: { + auto ts_array = std::dynamic_pointer_cast(_batch->column(column_index)); + if (ts_array->IsNull(_current_line_of_group)) { + RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); + } else { + time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) * 24 * 60 * 60); + tm* local; + local = localtime(×tamp); + char* to = reinterpret_cast(&tmp_buf); + wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d", local); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); + } + break; + } + case arrow::Type::type::DATE64: { + auto ts_array = std::dynamic_pointer_cast(_batch->column(column_index)); + if (ts_array->IsNull(_current_line_of_group)) { + RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); + } else { + // convert milliseconds to seconds + time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) / 1000); + tm* local; + local = localtime(×tamp); + char* to = reinterpret_cast(&tmp_buf); + wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d %H:%M:%S", local); + fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); + } + break; + } default: { // other type not support. std::stringstream str_error; diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index defe3d9ebf09fd..d1445e63870706 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -68,7 +68,7 @@ class ParquetFile : public arrow::io::RandomAccessFile { // Reader of broker parquet file class ParquetReaderWrap { public: - ParquetReaderWrap(FileReader *file_reader); + ParquetReaderWrap(FileReader *file_reader, const std::map& columns_from_path); virtual ~ParquetReaderWrap(); // Read @@ -85,6 +85,7 @@ class ParquetReaderWrap { Status handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbtyes); private: + const std::map& _columns_from_path; parquet::ReaderProperties _properties; std::shared_ptr _parquet; diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 2ce43a121956a1..0d4bcfdfeffcd3 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -141,7 +141,7 @@ Status ParquetScanner::open_next_reader() { file_reader->close(); continue; } - _cur_file_reader = new ParquetReaderWrap(file_reader.release()); + _cur_file_reader = new ParquetReaderWrap(file_reader.release(), range.columns_from_path); Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs); if (status.is_end_of_file()) { continue; diff --git a/be/test/exec/broker_scan_node_test.cpp b/be/test/exec/broker_scan_node_test.cpp index 5ccb5506e3c1d7..0bc51602c3dd73 100644 --- a/be/test/exec/broker_scan_node_test.cpp +++ b/be/test/exec/broker_scan_node_test.cpp @@ -153,7 +153,33 @@ void BrokerScanNodeTest::init_desc_table() { slot_desc.nullIndicatorByte = 0; slot_desc.nullIndicatorBit = -1; slot_desc.colName = "k3"; - slot_desc.slotIdx = 2; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + // k4(partitioned column) + { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 0; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 1; + slot_desc.byteOffset = 12; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = -1; + slot_desc.colName = "k4"; + slot_desc.slotIdx = 4; slot_desc.isMaterialized = true; t_desc_table.slotDescriptors.push_back(slot_desc); @@ -164,7 +190,7 @@ void BrokerScanNodeTest::init_desc_table() { // TTupleDescriptor dest TTupleDescriptor t_tuple_desc; t_tuple_desc.id = 0; - t_tuple_desc.byteSize = 12; + t_tuple_desc.byteSize = 16; t_tuple_desc.numNullBytes = 0; t_tuple_desc.tableId = 0; t_tuple_desc.__isset.tableId = true; @@ -251,7 +277,34 @@ void BrokerScanNodeTest::init_desc_table() { slot_desc.nullIndicatorByte = 0; slot_desc.nullIndicatorBit = -1; slot_desc.colName = "k3"; - slot_desc.slotIdx = 2; + slot_desc.slotIdx = 3; + slot_desc.isMaterialized = true; + + t_desc_table.slotDescriptors.push_back(slot_desc); + } + // k4(partitioned column) + { + TSlotDescriptor slot_desc; + + slot_desc.id = next_slot_id++; + slot_desc.parent = 1; + TTypeDesc type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(65535); + node.__set_scalar_type(scalar_type); + type.types.push_back(node); + } + slot_desc.slotType = type; + slot_desc.columnPos = 1; + slot_desc.byteOffset = 48; + slot_desc.nullIndicatorByte = 0; + slot_desc.nullIndicatorBit = -1; + slot_desc.colName = "k4"; + slot_desc.slotIdx = 4; slot_desc.isMaterialized = true; t_desc_table.slotDescriptors.push_back(slot_desc); @@ -261,7 +314,7 @@ void BrokerScanNodeTest::init_desc_table() { // TTupleDescriptor source TTupleDescriptor t_tuple_desc; t_tuple_desc.id = 1; - t_tuple_desc.byteSize = 48; + t_tuple_desc.byteSize = 60; t_tuple_desc.numNullBytes = 0; t_tuple_desc.tableId = 0; t_tuple_desc.__isset.tableId = true; @@ -276,7 +329,7 @@ void BrokerScanNodeTest::init_desc_table() { void BrokerScanNodeTest::init() { _params.column_separator = ','; _params.line_delimiter = '\n'; - + TTypeDesc int_type; { TTypeNode node; @@ -297,7 +350,7 @@ void BrokerScanNodeTest::init() { varchar_type.types.push_back(node); } - for (int i = 0; i < 3; ++i) { + for (int i = 0; i < 4; ++i) { TExprNode cast_expr; cast_expr.node_type = TExprNodeType::CAST_EXPR; cast_expr.type = int_type; @@ -319,7 +372,7 @@ void BrokerScanNodeTest::init() { slot_ref.type = varchar_type; slot_ref.num_children = 0; slot_ref.__isset.slot_ref = true; - slot_ref.slot_ref.slot_id = 4 + i; + slot_ref.slot_ref.slot_id = 5 + i; slot_ref.slot_ref.tuple_id = 1; TExpr expr; @@ -327,7 +380,7 @@ void BrokerScanNodeTest::init() { expr.nodes.push_back(slot_ref); _params.expr_of_dest_slot.emplace(i + 1, expr); - _params.src_slot_ids.push_back(4 + i); + _params.src_slot_ids.push_back(5 + i); } // _params.__isset.expr_of_dest_slot = true; _params.__set_dest_tuple_id(0); @@ -367,6 +420,8 @@ TEST_F(BrokerScanNodeTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; range.splittable = true; + std::map columns_from_path = {{"k4", "1"}}; + range.__set_columns_from_path(columns_from_path); broker_scan_range.ranges.push_back(range); scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); @@ -386,6 +441,8 @@ TEST_F(BrokerScanNodeTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; range.splittable = true; + std::map columns_from_path = {{"k4", "2"}}; + range.__set_columns_from_path(columns_from_path); broker_scan_range.ranges.push_back(range); scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); @@ -394,7 +451,7 @@ TEST_F(BrokerScanNodeTest, normal) { } scan_node.set_scan_ranges(scan_ranges); - + status = scan_node.open(&_runtime_state); ASSERT_TRUE(status.ok()); diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp index 2d96e44cdd5b95..d91eec40eccb2d 100644 --- a/be/test/exec/parquet_scanner_test.cpp +++ b/be/test/exec/parquet_scanner_test.cpp @@ -68,14 +68,14 @@ class ParquetSannerTest : public testing::Test { #define TUPLE_ID_DST 0 #define TUPLE_ID_SRC 1 -#define CLOMN_NUMBERS 19 +#define CLOMN_NUMBERS 20 #define DST_TUPLE_SLOT_ID_START 1 -#define SRC_TUPLE_SLOT_ID_START 20 +#define SRC_TUPLE_SLOT_ID_START 21 int ParquetSannerTest::create_src_tuple(TDescriptorTable& t_desc_table, int next_slot_id) { const char *clomnNames[] = {"log_version", "log_time", "log_time_stamp", "js_version", "vst_cookie", "vst_ip", "vst_user_id", "vst_user_agent", "device_resolution", "page_url", "page_refer_url", "page_yyid", "page_type", "pos_type", "content_id", "media_id", - "spm_cnt", "spm_pre", "scm_cnt"}; + "spm_cnt", "spm_pre", "scm_cnt", "partition_column"}; for (int i = 0; i < CLOMN_NUMBERS; i++) { TSlotDescriptor slot_desc; @@ -201,7 +201,7 @@ int ParquetSannerTest::create_dst_tuple(TDescriptorTable& t_desc_table, int next const char *clomnNames[] = {"log_version", "log_time", "log_time_stamp", "js_version", "vst_cookie", "vst_ip", "vst_user_id", "vst_user_agent", "device_resolution", "page_url", "page_refer_url", "page_yyid", "page_type", "pos_type", "content_id", "media_id", - "spm_cnt", "spm_pre", "scm_cnt"}; + "spm_cnt", "spm_pre", "scm_cnt", "partition_column"}; for (int i = 3; i < CLOMN_NUMBERS; i++, byteOffset+=16) { TSlotDescriptor slot_desc; @@ -435,6 +435,9 @@ TEST_F(ParquetSannerTest, normal) { range.size = -1; range.format_type = TFileFormatType::FORMAT_PARQUET; range.splittable = true; + + std::map columns_from_path = {{"partition_column", "value"}}; + range.__set_columns_from_path(columns_from_path); #if 1 range.path = "./be/test/exec/test_data/parquet_scanner/localfile.parquet"; range.file_type = TFileType::FILE_LOCAL; diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md index 34a7c8c8ad5690..07a1ef3ab11918 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md @@ -48,6 +48,7 @@ [PARTITION (p1, p2)] [COLUMNS TERMINATED BY "column_separator"] [FORMAT AS "file_type"] + [COLUMNS FROM PATH AS (columns_from_path)] [(column_list)] [SET (k1 = func(k2))] @@ -74,6 +75,12 @@ file_type: 用于指定导入文件的类型,例如:parquet、csv。默认值通过文件后缀名判断。 + + columns_from_path: + + 用于指定需要从文件路径中解析的字段。 + 语法: + (col_from_path_name1, col_from_path_name2, ...) column_list: @@ -278,6 +285,22 @@ (k1, k2, k3) ) WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + + 9. 提取文件路径中的压缩字段 + 如果需要,则会根据表中定义的字段类型解析文件路径中的压缩字段(partitioned fields),类似Spark中Partition Discovery的功能 + LOAD LABEL example_db.label10 + ( + DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*") + INTO TABLE `my_table` + FORMAT AS "csv" + COLUMNS FROM PATH AS (city, utc_date) + (k1, k2, k3) + SET (uniq_id = md5sum(k1, city)) + ) + WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + + hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing目录下包括如下文件:[hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv, hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv, ...] + 则提取文件路径的中的city和utc_date字段 ## keyword LOAD diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 5dbcfe99d66977..735bca9a857dc8 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -211,7 +211,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_MAX, KW_MAX_VALUE, KW_MERGE, KW_MIN, KW_MIGRATE, KW_MIGRATIONS, KW_MODIFY, KW_NAME, KW_NAMES, KW_NEGATIVE, KW_NO, KW_NOT, KW_NULL, KW_NULLS, KW_OBSERVER, KW_OFFSET, KW_ON, KW_ONLY, KW_OPEN, KW_OR, KW_ORDER, KW_OUTER, KW_OVER, - KW_PARTITION, KW_PARTITIONS, KW_PRECEDING, + KW_PARTITION, KW_PARTITIONS, KW_PATH, KW_PRECEDING, KW_PASSWORD, KW_PLUGIN, KW_PLUGINS, KW_PRIMARY, KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROPERTIES, KW_PROPERTY, @@ -379,7 +379,7 @@ nonterminal LabelName job_label; nonterminal String opt_system; nonterminal String opt_cluster; nonterminal BrokerDesc opt_broker; -nonterminal List opt_col_list, col_list, opt_dup_keys; +nonterminal List opt_col_list, col_list, opt_dup_keys, opt_columns_from_path; nonterminal List opt_partitions, partitions; nonterminal List opt_col_mapping_list; nonterminal ColumnSeparator opt_field_term, column_separator; @@ -1050,10 +1050,11 @@ data_desc ::= opt_partitions:partitionNames opt_field_term:colSep opt_file_format:fileFormat + opt_columns_from_path:columnsFromPath opt_col_list:colList opt_col_mapping_list:colMappingList {: - RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, isNeg, colMappingList); + RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, columnsFromPath, isNeg, colMappingList); :} ; @@ -1110,6 +1111,13 @@ opt_file_format ::= {: RESULT = format; :} ; +opt_columns_from_path ::= + /* Empty */ + {: RESULT = null; :} + | KW_COLUMNS KW_FROM KW_PATH KW_AS LPAREN ident_list:columnsFromPath RPAREN + {: RESULT = columnsFromPath; :} + ; + opt_col_list ::= {: RESULT = null; @@ -3924,6 +3932,8 @@ keyword ::= {: RESULT = id; :} | KW_FORMAT:id {: RESULT = id; :} + | KW_PATH:id + {: RESULT = id; :} | KW_FUNCTION:id {: RESULT = id; :} | KW_END:id diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index 0623a38ea30928..cb5119f58294fc 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -52,13 +52,14 @@ // [PARTITION (p1, p2)] // [COLUMNS TERMINATED BY separator] // [FORMAT AS format] +// [COLUMNS FROM PATH AS (col1, ...)] // [(tmp_col1, tmp_col2, col3, ...)] // [SET (k1=f1(xx), k2=f2(xxx))] /** * The transform of columns should be added after the keyword named COLUMNS. * The transform after the keyword named SET is the old ways which only supports the hadoop function. - * It old way of transform will be removed gradually. It + * It old way of transform will be removed gradually. It */ public class DataDescription { private static final Logger LOG = LogManager.getLogger(DataDescription.class); @@ -75,6 +76,7 @@ public class DataDescription { private final List columns; private final ColumnSeparator columnSeparator; private final String fileFormat; + private final List columnsFromPath; private final boolean isNegative; private final List columnMappingList; @@ -101,12 +103,25 @@ public DataDescription(String tableName, String fileFormat, boolean isNegative, List columnMappingList) { + this(tableName, partitionNames, filePaths, columns, columnSeparator, fileFormat, null, isNegative, columnMappingList); + } + + public DataDescription(String tableName, + List partitionNames, + List filePaths, + List columns, + ColumnSeparator columnSeparator, + String fileFormat, + List columnsFromPath, + boolean isNegative, + List columnMappingList) { this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = filePaths; this.columns = columns; this.columnSeparator = columnSeparator; this.fileFormat = fileFormat; + this.columnsFromPath = columnsFromPath; this.isNegative = isNegative; this.columnMappingList = columnMappingList; } @@ -135,6 +150,10 @@ public String getFileFormat() { return fileFormat; } + public List getColumnsFromPath() { + return columnsFromPath; + } + public String getColumnSeparator() { if (columnSeparator == null) { return null; @@ -196,7 +215,14 @@ public boolean isHadoopLoad() { * "col2": "tmp_col2+1", "col3": "strftime("%Y-%m-%d %H:%M:%S", tmp_col3)"} */ private void analyzeColumns() throws AnalysisException { - if (columns == null || columns.isEmpty()) { + List columnList = Lists.newArrayList(); + if (columns != null) { + columnList.addAll(columns); + } + if (columnsFromPath != null) { + columnList.addAll(columnsFromPath); + } + if (columnList.isEmpty()) { return; } // merge columns exprs from columns and columnMappingList @@ -204,7 +230,7 @@ private void analyzeColumns() throws AnalysisException { Set columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); parsedColumnExprList = Lists.newArrayList(); // Step1: analyze columns - for (String columnName : columns) { + for (String columnName : columnList) { if (!columnNames.add(columnName)) { throw new AnalysisException("Duplicate column : " + columnName); } @@ -491,6 +517,10 @@ public String apply(String s) { if (columnSeparator != null) { sb.append(" COLUMNS TERMINATED BY ").append(columnSeparator.toSql()); } + if (columnsFromPath != null && !columnsFromPath.isEmpty()) { + sb.append(" COLUMNS FROM PATH AS ("); + Joiner.on(", ").appendTo(sb, columnsFromPath).append(")"); + } if (columns != null && !columns.isEmpty()) { sb.append(" ("); Joiner.on(", ").appendTo(sb, columns).append(")"); diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 95683e20ac3693..1c87c9926f49d9 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -35,5 +35,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_58; + public static int meta_version = FeMetaVersion.VERSION_59; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index b5bdb2a42d9edd..01482f74ca1d0a 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -126,4 +126,6 @@ public final class FeMetaVersion { public static final int VERSION_57 = 57; // broker load support function, persist origin stmt in broker load public static final int VERSION_58 = 58; + // broker load support parsing columns from file path + public static final int VERSION_59 = 59; } diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index baf25b0cbe1d17..4f934d0c2e40e3 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -61,6 +61,7 @@ public class BrokerFileGroup implements Writable { private String lineDelimiter; // fileFormat may be null, which means format will be decided by file's suffix private String fileFormat; + private List columnsFromPath; private boolean isNegative; private List partitionIds; // this is a compatible param which only happens before the function of broker has been supported. @@ -88,6 +89,7 @@ public BrokerFileGroup(BrokerTable table) throws AnalysisException { public BrokerFileGroup(DataDescription dataDescription) { this.dataDescription = dataDescription; + this.columnsFromPath = dataDescription.getColumnsFromPath(); this.exprColumnMap = null; this.columnExprList = dataDescription.getParsedColumnExprList(); } @@ -161,6 +163,10 @@ public String getFileFormat() { return fileFormat; } + public List getColumnsFromPath() { + return columnsFromPath; + } + public boolean isNegative() { return isNegative; } @@ -192,6 +198,17 @@ public String toString() { } sb.append("]"); } + if (columnsFromPath != null) { + sb.append(",columnsFromPath=["); + int idx = 0; + for (String name : columnsFromPath) { + if (idx++ != 0) { + sb.append(","); + } + sb.append(name); + } + sb.append("]"); + } if (fileFieldNames != null) { sb.append(",fileFieldNames=["); int idx = 0; @@ -273,6 +290,15 @@ public void write(DataOutput out) throws IOException { out.writeBoolean(true); Text.writeString(out, fileFormat); } + // columnsFromPath + if (columnsFromPath == null) { + out.writeInt(0); + } else { + out.writeInt(columnsFromPath.size()); + for (String name : columnsFromPath) { + Text.writeString(out, name); + } + } } @Override @@ -326,6 +352,16 @@ public void readFields(DataInput in) throws IOException { fileFormat = Text.readString(in); } } + // columnsFromPath + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_59) { + int fileFieldNameSize = in.readInt(); + if (fileFieldNameSize > 0) { + columnsFromPath = Lists.newArrayList(); + for (int i = 0; i < fileFieldNameSize; ++i) { + columnsFromPath.add(Text.readString(in)); + } + } + } // There are no columnExprList in the previous load job which is created before function is supported. // The columnExprList could not be analyzed without origin stmt in the previous load job. @@ -337,6 +373,11 @@ public void readFields(DataInput in) throws IOException { for (String columnName : fileFieldNames) { columnExprList.add(new ImportColumnDesc(columnName, null)); } + if (columnsFromPath != null && !columnsFromPath.isEmpty()) { + for (String columnName : columnsFromPath) { + columnExprList.add(new ImportColumnDesc(columnName, null)); + } + } if (exprColumnMap == null || exprColumnMap.isEmpty()) { return; } diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 5aa11dd32a6f71..dadb1ee1354a7f 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -651,11 +651,15 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip for (Column column : tableSchema) { nameToTableColumn.put(column.getName(), column); } - - // source columns List columnNames = Lists.newArrayList(); - List assignColumnNames = dataDescription.getColumnNames(); - if (assignColumnNames == null) { + List assignColumnNames = Lists.newArrayList(); + if (dataDescription.getColumnNames() != null) { + assignColumnNames.addAll(dataDescription.getColumnNames()); + } + if (dataDescription.getColumnsFromPath() != null) { + assignColumnNames.addAll(dataDescription.getColumnsFromPath()); + } + if (assignColumnNames.isEmpty()) { // use table columns for (Column column : tableSchema) { columnNames.add(column.getName()); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 3cda72adf7425d..fc4b8c779bc135 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -75,6 +75,7 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -296,6 +297,7 @@ private void initColumns(ParamCreateContext context) throws UserException { slotDesc.setIsMaterialized(true); // same as ISSUE A slotDesc.setIsNullable(true); + slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); params.addToSrc_slot_ids(slotDesc.getId().asInt()); slotDescByName.put(realColName, slotDesc); } @@ -636,7 +638,7 @@ private TFileFormatType formatType(String fileFormat, String path) { // If fileFormat is not null, we use fileFormat instead of check file's suffix private void processFileGroup( - String fileFormat, + BrokerFileGroup fileGroup, TBrokerScanRangeParams params, List fileStatuses) throws UserException { @@ -651,16 +653,17 @@ private void processFileGroup( TBrokerFileStatus fileStatus = fileStatuses.get(i); long leftBytes = fileStatus.size - curFileOffset; long tmpBytes = curInstanceBytes + leftBytes; - TFileFormatType formatType = formatType(fileFormat, fileStatus.path); + TFileFormatType formatType = formatType(fileGroup.getFileFormat(), fileStatus.path); + Map columnsFromPath = parseColumnsFromPath(fileStatus.path, fileGroup.getColumnsFromPath()); if (tmpBytes > bytesPerInstance) { // Now only support split plain text if (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) { long rangeBytes = bytesPerInstance - curInstanceBytes; - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, rangeBytes); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, rangeBytes, columnsFromPath); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset += rangeBytes; } else { - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes, columnsFromPath); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; i++; @@ -672,7 +675,7 @@ private void processFileGroup( curInstanceBytes = 0; } else { - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes, columnsFromPath); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; @@ -687,7 +690,7 @@ private void processFileGroup( } private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, - TFileFormatType formatType, long rangeBytes) { + TFileFormatType formatType, long rangeBytes, Map columnsFromPath) { TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); rangeDesc.setFile_type(TFileType.FILE_BROKER); rangeDesc.setFormat_type(formatType); @@ -696,9 +699,40 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt rangeDesc.setStart_offset(curFileOffset); rangeDesc.setSize(rangeBytes); rangeDesc.setFile_size(fileStatus.size); + rangeDesc.setColumns_from_path(columnsFromPath); return rangeDesc; } + private Map parseColumnsFromPath(String filePath, List columnsFromPath) throws UserException { + if (columnsFromPath == null || columnsFromPath.isEmpty()) { + return Collections.emptyMap(); + } + String[] strings = filePath.split("/"); + Map columns = new HashMap<>(); + for (int i = strings.length - 1; i >= 0; i--) { + String str = strings[i]; + if (str == null || str.isEmpty() || !str.contains("=")) { + continue; + } + String[] pair = str.split("="); + if (pair.length != 2) { + continue; + } + if (!columnsFromPath.contains(pair[0])) { + continue; + } + columns.put(pair[0], pair[1]); + if (columns.size() > columnsFromPath.size()) { + break; + } + } + if (columns.size() != columnsFromPath.size()) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + LOG.info("columns from path: " + columns + ", filePath: " + filePath); + return columns; + } + @Override public void finalize(Analyzer analyzer) throws UserException { locationsList = Lists.newArrayList(); @@ -714,7 +748,7 @@ public void finalize(Analyzer analyzer) throws UserException { } catch (AnalysisException e) { throw new UserException(e.getMessage()); } - processFileGroup(context.fileGroup.getFileFormat(), context.params, fileStatuses); + processFileGroup(context.fileGroup, context.params, fileStatuses); } if (LOG.isDebugEnabled()) { for (TScanRangeLocations locations : locationsList) { @@ -766,3 +800,5 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) } } + + diff --git a/fe/src/main/jflex/sql_scanner.flex b/fe/src/main/jflex/sql_scanner.flex index 07d1fb61999444..fb80ff749474b2 100644 --- a/fe/src/main/jflex/sql_scanner.flex +++ b/fe/src/main/jflex/sql_scanner.flex @@ -155,6 +155,7 @@ import org.apache.doris.common.util.SqlUtils; keywordMap.put("following", new Integer(SqlParserSymbols.KW_FOLLOWING)); keywordMap.put("for", new Integer(SqlParserSymbols.KW_FOR)); keywordMap.put("format", new Integer(SqlParserSymbols.KW_FORMAT)); + keywordMap.put("path", new Integer(SqlParserSymbols.KW_PATH)); keywordMap.put("from", new Integer(SqlParserSymbols.KW_FROM)); keywordMap.put("frontend", new Integer(SqlParserSymbols.KW_FRONTEND)); keywordMap.put("frontends", new Integer(SqlParserSymbols.KW_FRONTENDS)); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index d587e34b58b8c9..1ad19717a75acd 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -114,6 +114,8 @@ struct TBrokerRangeDesc { 7: optional Types.TUniqueId load_id // total size of the file 8: optional i64 file_size + // columns parsed from file path + 9: optional map columns_from_path } struct TBrokerScanRangeParams { From 1f458870cbcf7fd0e4f406fdd14ec93d4a38ba23 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Tue, 13 Aug 2019 20:38:00 +0800 Subject: [PATCH 02/14] Enable parsing columns from file path for Broker Load --- be/src/exec/broker_scanner.cpp | 29 ++++++++++------ be/src/exec/broker_scanner.h | 1 + be/src/exec/parquet_reader.cpp | 32 ++++++++++------- be/src/exec/parquet_reader.h | 1 + .../sql-statements/Data Manipulation/LOAD.md | 16 ++++----- fe/src/main/cup/sql_parser.cup | 2 +- .../doris/analysis/DataDescription.java | 3 +- .../apache/doris/load/BrokerFileGroup.java | 1 + .../apache/doris/planner/BrokerScanNode.java | 34 +++++++++++-------- gensrc/thrift/PlanNodes.thrift | 2 +- 10 files changed, 72 insertions(+), 49 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index e5d67b1bd34a05..86d270b518a114 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -459,6 +459,7 @@ bool BrokerScanner::convert_one_row( inline void BrokerScanner::fill_slot(SlotDescriptor* slot_desc, const Slice& value) { if (slot_desc->is_nullable() && is_null(value)) { _src_tuple->set_null(slot_desc->null_indicator_offset()); + return; } _src_tuple->set_not_null(slot_desc->null_indicator_offset()); void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); @@ -467,6 +468,19 @@ inline void BrokerScanner::fill_slot(SlotDescriptor* slot_desc, const Slice& val str_slot->len = value.size; } +inline void BrokerScanner::fill_slots_of_columns_from_path(int start) { + // values of columns from path can not be null + for (int i = start; i < _src_slot_descs.size(); ++i) { + auto slot_desc = _src_slot_descs[i]; + _src_tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast(slot); + const std::string& column_from_path = _columns_from_path[i - start]; + str_slot->ptr = column_from_path.c_str(); + str_slot->len = column_from_path.size(); + } +} + // Convert one row to this tuple bool BrokerScanner::line_to_src_tuple(const Slice& line) { @@ -505,19 +519,14 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { } int file_column_index = 0; - for (int i = 0; i < _src_slot_descs.size(); ++i) { + for (; file_column_index < values.size(); ++file_column_index) { auto slot_desc = _src_slot_descs[i]; - auto iter = _columns_from_path.find(slot_desc->col_name()); - if (iter != _columns_from_path.end()) { - std::string partitioned_field = iter->second; - const Slice value = Slice(partitioned_field.c_str(), partitioned_field.size()); - fill_slot(slot_desc, value); - } else { - const Slice& value = values[file_column_index++]; - fill_slot(slot_desc, value); - } + const Slice& value = values[file_column_index]; + fill_slot(slot_desc, value); } + fill_slots_of_columns_from_path(file_column_index); + return true; } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index e21db9263554eb..941d342aec7183 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -105,6 +105,7 @@ class BrokerScanner : public BaseScanner { Status line_to_src_tuple(); void fill_slot(SlotDescriptor* slot_desc, const Slice& value); + void fill_slots_of_columns_from_path(int start); bool line_to_src_tuple(const Slice& line); private:; const std::vector& _ranges; diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 1ae97530928899..6ce9da9fc94dd9 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -204,27 +204,30 @@ Status ParquetReaderWrap::handle_timestamp(const std::shared_ptrset_not_null(slot_desc->null_indicator_offset()); + void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast(slot); + const std::string& column_from_path = _columns_from_path[i - start]; + str_slot->ptr = column_from_path.c_str(); + str_slot->len = column_from_path.size(); + } +} + Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& tuple_slot_descs, MemPool* mem_pool, bool* eof) { uint8_t tmp_buf[128] = {0}; int32_t wbytes = 0; const uint8_t *value = nullptr; - int index = 0; int column_index = 0; try { - size_t slots = tuple_slot_descs.size(); + size_t slots = _parquet_column_ids.size(); for (size_t i = 0; i < slots; ++i) { auto slot_desc = tuple_slot_descs[i]; - auto iter = _columns_from_path.find(slot_desc->col_name()); - if (iter != _columns_from_path.end()) { - std::string partitioned_field = iter->second; - value = reinterpret_cast(partitioned_field.c_str()); - wbytes = partitioned_field.size(); - fill_slot(tuple, slot_desc, mem_pool, value, wbytes); - continue; - } else { - column_index = index++; // column index in batch record - } - switch (_parquet_column_type[column_index]) { + column_index = i;// column index in batch record + switch (_parquet_column_type[i]) { case arrow::Type::type::STRING: { auto str_array = std::dynamic_pointer_cast(_batch->column(column_index)); if (str_array->IsNull(_current_line_of_group)) { @@ -458,6 +461,9 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& return Status::InternalError(str_error.str()); } + ++column_index; + fill_slots_of_columns_from_path(column_index); + // update data value ++_current_line_of_group; return read_record_batch(tuple_slot_descs, eof); diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index d1445e63870706..daaece0b7cd7b2 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -83,6 +83,7 @@ class ParquetReaderWrap { Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc); Status read_record_batch(const std::vector& tuple_slot_descs, bool* eof); Status handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbtyes); + void fill_slots_of_columns_from_path(int start); private: const std::map& _columns_from_path; diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md index 07a1ef3ab11918..865bfc3df431b9 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md @@ -48,8 +48,8 @@ [PARTITION (p1, p2)] [COLUMNS TERMINATED BY "column_separator"] [FORMAT AS "file_type"] - [COLUMNS FROM PATH AS (columns_from_path)] [(column_list)] + [COLUMNS FROM PATH AS (columns_from_path)] [SET (k1 = func(k2))] 说明: @@ -75,12 +75,6 @@ file_type: 用于指定导入文件的类型,例如:parquet、csv。默认值通过文件后缀名判断。 - - columns_from_path: - - 用于指定需要从文件路径中解析的字段。 - 语法: - (col_from_path_name1, col_from_path_name2, ...) column_list: @@ -88,6 +82,12 @@ 当需要跳过导入文件中的某一列时,将该列指定为 table 中不存在的列名即可。 语法: (col_name1, col_name2, ...) + + columns_from_path: + + 用于指定需要从文件路径中解析的字段。 + 语法: + (col_from_path_name1, col_from_path_name2, ...) SET: @@ -293,8 +293,8 @@ DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing/*/*") INTO TABLE `my_table` FORMAT AS "csv" - COLUMNS FROM PATH AS (city, utc_date) (k1, k2, k3) + COLUMNS FROM PATH AS (city, utc_date) SET (uniq_id = md5sum(k1, city)) ) WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 735bca9a857dc8..8ad39bdf80865a 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -1050,8 +1050,8 @@ data_desc ::= opt_partitions:partitionNames opt_field_term:colSep opt_file_format:fileFormat - opt_columns_from_path:columnsFromPath opt_col_list:colList + opt_columns_from_path:columnsFromPath opt_col_mapping_list:colMappingList {: RESULT = new DataDescription(tableName, partitionNames, files, colList, colSep, fileFormat, columnsFromPath, isNeg, colMappingList); diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index cb5119f58294fc..1d8c02e47ae5bb 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -52,8 +52,8 @@ // [PARTITION (p1, p2)] // [COLUMNS TERMINATED BY separator] // [FORMAT AS format] -// [COLUMNS FROM PATH AS (col1, ...)] // [(tmp_col1, tmp_col2, col3, ...)] +// [COLUMNS FROM PATH AS (col1, ...)] // [SET (k1=f1(xx), k2=f2(xxx))] /** @@ -228,6 +228,7 @@ private void analyzeColumns() throws AnalysisException { // merge columns exprs from columns and columnMappingList // used to check duplicated column name Set columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + // Order of parsedColumnExprList: columns(fileFieldNames) + columnsFromPath parsedColumnExprList = Lists.newArrayList(); // Step1: analyze columns for (String columnName : columnList) { diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 4f934d0c2e40e3..13f92d5e4e2b2d 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -369,6 +369,7 @@ public void readFields(DataInput in) throws IOException { if (fileFieldNames == null || fileFieldNames.isEmpty()) { return; } + // Order of columnExprList: fileFieldNames + columnsFromPath columnExprList = Lists.newArrayList(); for (String columnName : fileFieldNames) { columnExprList.add(new ImportColumnDesc(columnName, null)); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index fc4b8c779bc135..6945b1d8680dfe 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -75,7 +75,6 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -654,7 +653,7 @@ private void processFileGroup( long leftBytes = fileStatus.size - curFileOffset; long tmpBytes = curInstanceBytes + leftBytes; TFileFormatType formatType = formatType(fileGroup.getFileFormat(), fileStatus.path); - Map columnsFromPath = parseColumnsFromPath(fileStatus.path, fileGroup.getColumnsFromPath()); + List columnsFromPath = parseColumnsFromPath(fileStatus.path, fileGroup.getColumnsFromPath()); if (tmpBytes > bytesPerInstance) { // Now only support split plain text if (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) { @@ -690,7 +689,7 @@ private void processFileGroup( } private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, - TFileFormatType formatType, long rangeBytes, Map columnsFromPath) { + TFileFormatType formatType, long rangeBytes, List columnsFromPath) { TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); rangeDesc.setFile_type(TFileType.FILE_BROKER); rangeDesc.setFormat_type(formatType); @@ -703,34 +702,39 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt return rangeDesc; } - private Map parseColumnsFromPath(String filePath, List columnsFromPath) throws UserException { + private List parseColumnsFromPath(String filePath, List columnsFromPath) throws UserException { if (columnsFromPath == null || columnsFromPath.isEmpty()) { - return Collections.emptyMap(); + return Collections.emptyList(); } String[] strings = filePath.split("/"); - Map columns = new HashMap<>(); - for (int i = strings.length - 1; i >= 0; i--) { + if (strings.length < 2) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + String[] columns = new String[columnsFromPath.size()]; + int size = 0; + for (int i = strings.length - 2; i >= 0; i--) { String str = strings[i]; if (str == null || str.isEmpty() || !str.contains("=")) { - continue; + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); } String[] pair = str.split("="); if (pair.length != 2) { - continue; + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); } - if (!columnsFromPath.contains(pair[0])) { + int index = columnsFromPath.indexOf(pair[0]); + if (index == -1) { continue; } - columns.put(pair[0], pair[1]); - if (columns.size() > columnsFromPath.size()) { + columns[index] = pair[1]; + size++; + if (size >= columnsFromPath.size()) { break; } } - if (columns.size() != columnsFromPath.size()) { + if (size != columnsFromPath.size()) { throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); } - LOG.info("columns from path: " + columns + ", filePath: " + filePath); - return columns; + return Lists.newArrayList(columns); } @Override diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1ad19717a75acd..1bdabbb4e61bd3 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -115,7 +115,7 @@ struct TBrokerRangeDesc { // total size of the file 8: optional i64 file_size // columns parsed from file path - 9: optional map columns_from_path + 9: optional list columns_from_path } struct TBrokerScanRangeParams { From 1da3654e4ba89f169b525af9dcd8b1e2883b003c Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Tue, 13 Aug 2019 00:09:59 +0800 Subject: [PATCH 03/14] Enable parsing columns from file path for Broker Load --- be/src/exec/broker_scanner.cpp | 10 ++--- be/src/exec/broker_scanner.h | 4 +- .../sql-statements/Data Manipulation/LOAD.md | 44 +++++++++---------- fe/src/main/cup/sql_parser.cup | 4 +- .../doris/analysis/DataDescription.java | 22 +++++----- .../apache/doris/planner/BrokerScanNode.java | 12 ++--- gensrc/thrift/PlanNodes.thrift | 38 ++++++++-------- 7 files changed, 67 insertions(+), 67 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 86d270b518a114..d5ca3fd7baf512 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -40,7 +40,7 @@ namespace doris { BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, + const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, ScannerCounter* counter) : BaseScanner(state, profile, params, counter), @@ -74,7 +74,7 @@ Status BrokerScanner::open() { Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { SCOPED_TIMER(_read_timer); - // Get one line + // Get one line while (!_scanner_eof) { if (_cur_line_reader == nullptr || _cur_line_reader_eof) { RETURN_IF_ERROR(open_next_reader()); @@ -121,7 +121,7 @@ Status BrokerScanner::open_next_reader() { RETURN_IF_ERROR(open_file_reader()); RETURN_IF_ERROR(open_line_reader()); _next_range++; - + return Status::OK(); } @@ -383,8 +383,8 @@ bool BrokerScanner::check_decimal_input( } bool is_null(const Slice& slice) { - return slice.size == 2 && - slice.data[0] == '\\' && + return slice.size == 2 && + slice.data[0] == '\\' && slice.data[1] == 'N'; } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 941d342aec7183..f506fc84953def 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -55,7 +55,7 @@ class BrokerScanner : public BaseScanner { BrokerScanner( RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, + const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, ScannerCounter* counter); @@ -126,7 +126,7 @@ private:; bool _scanner_eof; - // When we fetch range doesn't start from 0, + // When we fetch range doesn't start from 0, // we will read to one ahead, and skip the first line bool _skip_next_line; diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md index 865bfc3df431b9..09997975791710 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md @@ -11,7 +11,7 @@ 本帮助主要描述第一种导入方式,即 Hadoop Load 相关帮助信息。其余导入方式可以使用以下命令查看帮助: !!!该导入方式可能在后续某个版本即不再支持,建议使用其他导入方式进行数据导入。!!! - + 1. help broker load; 2. help mini load; 3. help stream load; @@ -34,7 +34,7 @@ 当前导入批次的标签。在一个 database 内唯一。 语法: [database_name.]your_label - + 2. data_desc 用于描述一批导入数据。 @@ -51,9 +51,9 @@ [(column_list)] [COLUMNS FROM PATH AS (columns_from_path)] [SET (k1 = func(k2))] - + 说明: - file_path: + file_path: 文件路径,可以指定到一个文件,也可以用 * 通配符指定某个目录下的所有文件。通配符必须匹配到文件,而不能是目录。 @@ -61,21 +61,21 @@ 如果指定此参数,则只会导入指定的分区,导入分区以外的数据会被过滤掉。 如果不指定,默认导入table的所有分区。 - + NEGATIVE: 如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。 该参数仅适用于存在 value 列,并且 value 列的聚合类型仅为 SUM 的情况。 - + column_separator: 用于指定导入文件中的列分隔符。默认为 \t 如果是不可见字符,则需要加\\x作为前缀,使用十六进制来表示分隔符。 如hive文件的分隔符\x01,指定为"\\x01" - + file_type: - 用于指定导入文件的类型,例如:parquet、csv。默认值通过文件后缀名判断。 - + 用于指定导入文件的类型,例如:parquet、csv。默认值通过文件后缀名判断。 + column_list: 用于指定导入文件中的列和 table 中的列的对应关系。 @@ -88,7 +88,7 @@ 用于指定需要从文件路径中解析的字段。 语法: (col_from_path_name1, col_from_path_name2, ...) - + SET: 如果指定此参数,可以将源文件某一列按照函数进行转化,然后将转化后的结果导入到table中。 @@ -114,19 +114,19 @@ default_value(value) 设置某一列导入的默认值 不指定则使用建表时列的默认值 - md5sum(column1, column2, ...) 将指定的导入列的值求md5sum,返回32位16进制字符串 + md5sum(column1, column2, ...) 将指定的导入列的值求md5sum,返回32位16进制字符串 replace_value(old_value[, new_value]) 将导入文件中指定的old_value替换为new_value new_value如不指定则使用建表时列的默认值 - + hll_hash(column) 用于将表或数据里面的某一列转化成HLL列的数据结构 - + 3. opt_properties 用于指定一些特殊参数。 语法: [PROPERTIES ("key"="value", ...)] - + 可以指定如下参数: cluster: 导入所使用的 Hadoop 计算队列。 timeout: 指定导入操作的超时时间。默认超时为3天。单位秒。 @@ -160,7 +160,7 @@ ); 其中 hdfs_host 为 namenode 的 host,hdfs_port 为 fs.defaultFS 端口(默认9000) - + 2. 导入一批数据,包含多个文件。导入不同的 table,指定分隔符,指定列对应关系 LOAD LABEL example_db.label2 @@ -184,7 +184,7 @@ INTO TABLE `my_table` COLUMNS TERMINATED BY "\\x01" ); - + 4. 导入一批“负”数据 LOAD LABEL example_db.label4 @@ -241,13 +241,13 @@ SET ( k1 = strftime("%Y-%m-%d %H:%M:%S", tmp_k1), k2 = time_format("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", tmp_k2), - k3 = alignment_timestamp("day", tmp_k3), - k4 = default_value("1"), + k3 = alignment_timestamp("day", tmp_k3), + k4 = default_value("1"), k5 = md5sum(tmp_k1, tmp_k2, tmp_k3), k6 = replace_value("-", "10") ) ); - + 7. 导入数据到含有HLL列的表,可以是表中的列或者数据里面的列 LOAD LABEL example_db.label7 @@ -284,8 +284,8 @@ FORMAT AS "parquet" (k1, k2, k3) ) - WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); - + WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + 9. 提取文件路径中的压缩字段 如果需要,则会根据表中定义的字段类型解析文件路径中的压缩字段(partitioned fields),类似Spark中Partition Discovery的功能 LOAD LABEL example_db.label10 @@ -304,4 +304,4 @@ ## keyword LOAD - + diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index 8ad39bdf80865a..73985aa3c585d7 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -190,7 +190,7 @@ parser code {: :}; // Total keywords of doris -terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_AS, KW_ASC, KW_AUTHORS, +terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_AND, KW_ANTI, KW_AS, KW_ASC, KW_AUTHORS, KW_BACKEND, KW_BACKUP, KW_BETWEEN, KW_BEGIN, KW_BIGINT, KW_BOOLEAN, KW_BOTH, KW_BROKER, KW_BACKENDS, KW_BY, KW_CANCEL, KW_CASE, KW_CAST, KW_CHAIN, KW_CHAR, KW_CHARSET, KW_CLUSTER, KW_CLUSTERS, KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, @@ -201,7 +201,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_FALSE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FILE, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_GLOBAL, KW_GRANT, KW_GRANTS, KW_GROUP, KW_HASH, KW_HAVING, KW_HELP,KW_HLL, KW_HLL_UNION, KW_HUB, - KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE, + KW_IDENTIFIED, KW_IF, KW_IN, KW_INDEX, KW_INDEXES, KW_INFILE, KW_INNER, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_IS, KW_ISNULL, KW_ISOLATION, KW_JOIN, KW_KEY, KW_KILL, diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index 1d8c02e47ae5bb..45da205c4771ca 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -65,10 +65,10 @@ public class DataDescription { private static final Logger LOG = LogManager.getLogger(DataDescription.class); public static String FUNCTION_HASH_HLL = "hll_hash"; private static final List hadoopSupportFunctionName = Arrays.asList("strftime", "time_format", - "alignment_timestamp", - "default_value", "md5sum", - "replace_value", "now", - "hll_hash"); + "alignment_timestamp", + "default_value", "md5sum", + "replace_value", "now", + "hll_hash"); private final String tableName; private final List partitionNames; private final List filePaths; @@ -251,17 +251,17 @@ private void analyzeColumns() throws AnalysisException { if (!(columnExpr instanceof BinaryPredicate)) { throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " - + "Expr: " + columnExpr.toSql()); + + "Expr: " + columnExpr.toSql()); } BinaryPredicate predicate = (BinaryPredicate) columnExpr; if (predicate.getOp() != Operator.EQ) { throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " - + "The mapping operator error, op: " + predicate.getOp()); + + "The mapping operator error, op: " + predicate.getOp()); } Expr child0 = predicate.getChild(0); if (!(child0 instanceof SlotRef)) { throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " - + "The mapping column error. column: " + child0.toSql()); + + "The mapping column error. column: " + child0.toSql()); } String column = ((SlotRef) child0).getColumnName(); if (!columnNames.add(column)) { @@ -271,7 +271,7 @@ private void analyzeColumns() throws AnalysisException { Expr child1 = predicate.getChild(1); if (isHadoopLoad && !(child1 instanceof FunctionCallExpr)) { throw new AnalysisException("Hadoop load only supports the designated function. " - + "The error mapping function is:" + child1.toSql()); + + "The error mapping function is:" + child1.toSql()); } ImportColumnDesc importColumnDesc = new ImportColumnDesc(column, child1); parsedColumnExprList.add(importColumnDesc); @@ -471,10 +471,10 @@ private void checkLoadPriv(String fullDbName) throws AnalysisException { // check auth if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), fullDbName, tableName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), tableName); + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), tableName); } } diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 6945b1d8680dfe..b9cd7643bf537b 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -139,7 +139,7 @@ private static class ParamCreateContext { private List paramCreateContexts; public BrokerScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, - List> fileStatusesList, int filesAdded) { + List> fileStatusesList, int filesAdded) { super(id, desc, planNodeName); this.fileStatusesList = fileStatusesList; this.filesAdded = filesAdded; @@ -310,7 +310,7 @@ private void initColumns(ParamCreateContext context) throws UserException { SlotDescriptor slotDesc = slotDescByName.get(slot.getColumnName()); if (slotDesc == null) { throw new UserException("unknown reference column, column=" + entry.getKey() - + ", reference=" + slot.getColumnName()); + + ", reference=" + slot.getColumnName()); } smap.getLhs().add(slot); smap.getRhs().add(new SlotRef(slotDesc)); @@ -470,7 +470,7 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An expr = NullLiteral.create(column.getType()); } else { throw new UserException("Unknown slot ref(" - + destSlotDesc.getColumn().getName() + ") in source file"); + + destSlotDesc.getColumn().getName() + ") in source file"); } } } @@ -480,12 +480,12 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { if (!(expr instanceof FunctionCallExpr)) { throw new AnalysisException("HLL column must use hll_hash function, like " - + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); + + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); } FunctionCallExpr fn = (FunctionCallExpr) expr; if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash")) { throw new AnalysisException("HLL column must use hll_hash function, like " - + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); + + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); } expr.setType(Type.HLL); } @@ -689,7 +689,7 @@ private void processFileGroup( } private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, - TFileFormatType formatType, long rangeBytes, List columnsFromPath) { + TFileFormatType formatType, long rangeBytes, List columnsFromPath) { TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); rangeDesc.setFile_type(TFileType.FILE_BROKER); rangeDesc.setFormat_type(formatType); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1bdabbb4e61bd3..62a5df13f197fc 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -41,7 +41,7 @@ enum TPlanNodeType { OLAP_REWRITE_NODE, KUDU_SCAN_NODE, BROKER_SCAN_NODE, - EMPTY_SET_NODE, + EMPTY_SET_NODE, UNION_NODE, ES_SCAN_NODE, ES_HTTP_SCAN_NODE @@ -122,8 +122,8 @@ struct TBrokerScanRangeParams { 1: required byte column_separator; 2: required byte line_delimiter; - // We construct one line in file to a tuple. And each field of line - // correspond to a slot in this tuple. + // We construct one line in file to a tuple. And each field of line + // correspond to a slot in this tuple. // src_tuple_id is the tuple id of the input file 3: required Types.TTupleId src_tuple_id // src_slot_ids is the slot_ids of the input file @@ -141,7 +141,7 @@ struct TBrokerScanRangeParams { // If partition_ids is set, data that doesn't in this partition will be filtered. 8: optional list partition_ids - + // This is the mapping of dest slot id and src slot id in load expr // It excludes the slot id which has the transform expr 9: optional map dest_sid_to_src_sid_without_trans @@ -162,7 +162,7 @@ struct TEsScanRange { 1: required list es_hosts // es hosts is used by be scan node to connect to es // has to set index and type here, could not set it in scannode // because on scan node maybe scan an es alias then it contains one or more indices - 2: required string index + 2: required string index 3: optional string type 4: required i32 shard_id } @@ -324,7 +324,7 @@ enum TAggregationOp { DENSE_RANK, ROW_NUMBER, LAG, - HLL_C, + HLL_C, } //struct TAggregateFunctionCall { @@ -383,16 +383,16 @@ struct TSortNode { 3: optional i64 offset // TODO(lingbin): remove blew, because duplaicate with TSortInfo - 4: optional list ordering_exprs - 5: optional list is_asc_order - // Indicates whether the imposed limit comes DEFAULT_ORDER_BY_LIMIT. - 6: optional bool is_default_limit - // Indicates, for each expr, if nulls should be listed first or last. This is - // independent of is_asc_order. - 7: optional list nulls_first + 4: optional list ordering_exprs + 5: optional list is_asc_order + // Indicates whether the imposed limit comes DEFAULT_ORDER_BY_LIMIT. + 6: optional bool is_default_limit + // Indicates, for each expr, if nulls should be listed first or last. This is + // independent of is_asc_order. + 7: optional list nulls_first // Expressions evaluated over the input row that materialize the tuple to be so - // Contains one expr per slot in the materialized tuple. - 8: optional list sort_tuple_slot_exprs + // Contains one expr per slot in the materialized tuple. + 8: optional list sort_tuple_slot_exprs } enum TAnalyticWindowType { @@ -533,7 +533,7 @@ struct TBackendResourceProfile { // The maximum reservation for this plan node in bytes. MAX_INT64 means effectively // unlimited. -2: required i64 max_reservation = 12188490189880; // no max reservation limit +2: required i64 max_reservation = 12188490189880; // no max reservation limit // The spillable buffer size in bytes to use for this node, chosen by the planner. // Set iff the node uses spillable buffers. @@ -568,9 +568,9 @@ struct TPlanNode { 14: optional TMergeNode merge_node 15: optional TExchangeNode exchange_node 17: optional TMySQLScanNode mysql_scan_node - 18: optional TOlapScanNode olap_scan_node - 19: optional TCsvScanNode csv_scan_node - 20: optional TBrokerScanNode broker_scan_node + 18: optional TOlapScanNode olap_scan_node + 19: optional TCsvScanNode csv_scan_node + 20: optional TBrokerScanNode broker_scan_node 21: optional TPreAggregationNode pre_agg_node 22: optional TSchemaScanNode schema_scan_node 23: optional TMergeJoinNode merge_join_node From 3c6eced4e4b3fc542d1cba2216ce5bb3cdc1f8bc Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Tue, 13 Aug 2019 21:28:06 +0800 Subject: [PATCH 04/14] Enable parsing columns from file path for Broker Load --- be/src/exec/broker_scanner.cpp | 4 ++-- be/src/exec/broker_scanner.h | 2 +- be/src/exec/parquet_reader.cpp | 2 +- be/src/exec/parquet_reader.h | 2 +- be/test/exec/broker_scan_node_test.cpp | 4 ++-- be/test/exec/parquet_scanner_test.cpp | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index d5ca3fd7baf512..cdb0e5f39b2aa1 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -476,7 +476,7 @@ inline void BrokerScanner::fill_slots_of_columns_from_path(int start) { void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); StringValue* str_slot = reinterpret_cast(slot); const std::string& column_from_path = _columns_from_path[i - start]; - str_slot->ptr = column_from_path.c_str(); + str_slot->ptr = reinterpret_cast(column_from_path.c_str()); str_slot->len = column_from_path.size(); } } @@ -520,7 +520,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { int file_column_index = 0; for (; file_column_index < values.size(); ++file_column_index) { - auto slot_desc = _src_slot_descs[i]; + auto slot_desc = _src_slot_descs[file_column_index]; const Slice& value = values[file_column_index]; fill_slot(slot_desc, value); } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index f506fc84953def..7ee08cba9bc7e6 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -122,7 +122,7 @@ private:; Decompressor* _cur_decompressor; int _next_range; bool _cur_line_reader_eof; - std::map _columns_from_path; + std::vector _columns_from_path; bool _scanner_eof; diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 6ce9da9fc94dd9..137f0612000089 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -34,7 +34,7 @@ namespace doris { // Broker -ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, const std::map& columns_from_path) : +ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, const std::vector& columns_from_path) : _columns_from_path(columns_from_path), _total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) { _parquet = std::shared_ptr(new ParquetFile(file_reader)); _properties = parquet::ReaderProperties(); diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index daaece0b7cd7b2..9a4b17194c3ff5 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -86,7 +86,7 @@ class ParquetReaderWrap { void fill_slots_of_columns_from_path(int start); private: - const std::map& _columns_from_path; + const std::vector& _columns_from_path; parquet::ReaderProperties _properties; std::shared_ptr _parquet; diff --git a/be/test/exec/broker_scan_node_test.cpp b/be/test/exec/broker_scan_node_test.cpp index 0bc51602c3dd73..a83a4b1af71a58 100644 --- a/be/test/exec/broker_scan_node_test.cpp +++ b/be/test/exec/broker_scan_node_test.cpp @@ -420,7 +420,7 @@ TEST_F(BrokerScanNodeTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; range.splittable = true; - std::map columns_from_path = {{"k4", "1"}}; + std::vector columns_from_path{"1"}; range.__set_columns_from_path(columns_from_path); broker_scan_range.ranges.push_back(range); @@ -441,7 +441,7 @@ TEST_F(BrokerScanNodeTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; range.splittable = true; - std::map columns_from_path = {{"k4", "2"}}; + std::vector columns_from_path{"2"}; range.__set_columns_from_path(columns_from_path); broker_scan_range.ranges.push_back(range); diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp index d91eec40eccb2d..a6d1911de02d79 100644 --- a/be/test/exec/parquet_scanner_test.cpp +++ b/be/test/exec/parquet_scanner_test.cpp @@ -436,7 +436,7 @@ TEST_F(ParquetSannerTest, normal) { range.format_type = TFileFormatType::FORMAT_PARQUET; range.splittable = true; - std::map columns_from_path = {{"partition_column", "value"}}; + std::vector columns_from_path{"value"}; range.__set_columns_from_path(columns_from_path); #if 1 range.path = "./be/test/exec/test_data/parquet_scanner/localfile.parquet"; From a6ad01d4a1c93fa290967c0aa41b34fafff89428 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Tue, 13 Aug 2019 21:34:57 +0800 Subject: [PATCH 05/14] Enable parsing columns from file path for Broker Load --- be/src/exec/broker_scanner.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index cdb0e5f39b2aa1..4c0f241cfd5646 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -468,7 +468,7 @@ inline void BrokerScanner::fill_slot(SlotDescriptor* slot_desc, const Slice& val str_slot->len = value.size; } -inline void BrokerScanner::fill_slots_of_columns_from_path(int start) { +inline void ParquetReaderWrap::fill_slots_of_columns_from_path(int start) { // values of columns from path can not be null for (int i = start; i < _src_slot_descs.size(); ++i) { auto slot_desc = _src_slot_descs[i]; @@ -476,7 +476,7 @@ inline void BrokerScanner::fill_slots_of_columns_from_path(int start) { void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); StringValue* str_slot = reinterpret_cast(slot); const std::string& column_from_path = _columns_from_path[i - start]; - str_slot->ptr = reinterpret_cast(column_from_path.c_str()); + str_slot->ptr = column_from_path.c_str(); str_slot->len = column_from_path.size(); } } From d9bbb1b602c1996a61024c21fd5b876645632cff Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Tue, 13 Aug 2019 22:28:17 +0800 Subject: [PATCH 06/14] Enable parsing columns from file path for Broker Load --- be/src/exec/broker_scanner.cpp | 14 +++++++------- be/src/exec/broker_scanner.h | 2 +- be/src/exec/parquet_reader.cpp | 17 ++++++++--------- be/src/exec/parquet_reader.h | 4 ++-- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 4c0f241cfd5646..1d21d53e99ea86 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -468,15 +468,15 @@ inline void BrokerScanner::fill_slot(SlotDescriptor* slot_desc, const Slice& val str_slot->len = value.size; } -inline void ParquetReaderWrap::fill_slots_of_columns_from_path(int start) { +inline void BrokerScanner::fill_slots_of_columns_from_path(int start, const std::vector& src_slot_descs, Tuple* tuple) { // values of columns from path can not be null - for (int i = start; i < _src_slot_descs.size(); ++i) { - auto slot_desc = _src_slot_descs[i]; - _src_tuple->set_not_null(slot_desc->null_indicator_offset()); - void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); + for (int i = start; i < src_slot_descs.size(); ++i) { + auto slot_desc = src_slot_descs[i]; + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); StringValue* str_slot = reinterpret_cast(slot); const std::string& column_from_path = _columns_from_path[i - start]; - str_slot->ptr = column_from_path.c_str(); + str_slot->ptr = const_cast(column_from_path.c_str()); str_slot->len = column_from_path.size(); } } @@ -525,7 +525,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { fill_slot(slot_desc, value); } - fill_slots_of_columns_from_path(file_column_index); + fill_slots_of_columns_from_path(file_column_index, _src_slot_descs, _src_tuple); return true; } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 7ee08cba9bc7e6..5c5118568334a4 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -105,7 +105,7 @@ class BrokerScanner : public BaseScanner { Status line_to_src_tuple(); void fill_slot(SlotDescriptor* slot_desc, const Slice& value); - void fill_slots_of_columns_from_path(int start); + void fill_slots_of_columns_from_path(int start, const std::vector& src_slot_descs, Tuple* tuple); bool line_to_src_tuple(const Slice& line); private:; const std::vector& _ranges; diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index 137f0612000089..bf9e17d48891dc 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -128,8 +128,7 @@ Status ParquetReaderWrap::column_indices(const std::vector& tup if (iter != _map_column.end()) { _parquet_column_ids.emplace_back(iter->second); } else { - auto iter_1 = _columns_from_path.find(slot_desc->col_name()); - if (iter_1 == _columns_from_path.end()) { + if (std::find(_columns_from_path.begin(), _columns_from_path.end(), slot_desc->col_name()) != _columns_from_path.end()) { std::stringstream str_error; str_error << "Invalid Column Name:" << slot_desc->col_name(); LOG(WARNING) << str_error.str(); @@ -204,15 +203,15 @@ Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr& src_slot_descs, Tuple* tuple) { // values of columns from path can not be null - for (int i = start; i < _src_slot_descs.size(); ++i) { - auto slot_desc = _src_slot_descs[i]; - _src_tuple->set_not_null(slot_desc->null_indicator_offset()); - void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); + for (int i = start; i < src_slot_descs.size(); ++i) { + auto slot_desc = src_slot_descs[i]; + tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = tuple->get_slot(slot_desc->tuple_offset()); StringValue* str_slot = reinterpret_cast(slot); const std::string& column_from_path = _columns_from_path[i - start]; - str_slot->ptr = column_from_path.c_str(); + str_slot->ptr = const_cast(column_from_path.c_str()); str_slot->len = column_from_path.size(); } } @@ -462,7 +461,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } ++column_index; - fill_slots_of_columns_from_path(column_index); + fill_slots_of_columns_from_path(column_index, tuple_slot_descs, tuple); // update data value ++_current_line_of_group; diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index 9a4b17194c3ff5..80162902c8e046 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -68,7 +68,7 @@ class ParquetFile : public arrow::io::RandomAccessFile { // Reader of broker parquet file class ParquetReaderWrap { public: - ParquetReaderWrap(FileReader *file_reader, const std::map& columns_from_path); + ParquetReaderWrap(FileReader *file_reader, const std::vector& columns_from_path); virtual ~ParquetReaderWrap(); // Read @@ -83,7 +83,7 @@ class ParquetReaderWrap { Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc); Status read_record_batch(const std::vector& tuple_slot_descs, bool* eof); Status handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbtyes); - void fill_slots_of_columns_from_path(int start); + void fill_slots_of_columns_from_path(int start, const std::vector& src_slot_descs, Tuple* tuple); private: const std::vector& _columns_from_path; From 55b2b92da70895b0b181b7f64244a5a6ea1e5740 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Wed, 14 Aug 2019 00:15:43 +0800 Subject: [PATCH 07/14] Format text --- .../sql-statements/Data Manipulation/LOAD.md | 44 +++++++++---------- gensrc/thrift/PlanNodes.thrift | 38 ++++++++-------- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md index 09997975791710..865bfc3df431b9 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/LOAD.md @@ -11,7 +11,7 @@ 本帮助主要描述第一种导入方式,即 Hadoop Load 相关帮助信息。其余导入方式可以使用以下命令查看帮助: !!!该导入方式可能在后续某个版本即不再支持,建议使用其他导入方式进行数据导入。!!! - + 1. help broker load; 2. help mini load; 3. help stream load; @@ -34,7 +34,7 @@ 当前导入批次的标签。在一个 database 内唯一。 语法: [database_name.]your_label - + 2. data_desc 用于描述一批导入数据。 @@ -51,9 +51,9 @@ [(column_list)] [COLUMNS FROM PATH AS (columns_from_path)] [SET (k1 = func(k2))] - + 说明: - file_path: + file_path: 文件路径,可以指定到一个文件,也可以用 * 通配符指定某个目录下的所有文件。通配符必须匹配到文件,而不能是目录。 @@ -61,21 +61,21 @@ 如果指定此参数,则只会导入指定的分区,导入分区以外的数据会被过滤掉。 如果不指定,默认导入table的所有分区。 - + NEGATIVE: 如果指定此参数,则相当于导入一批“负”数据。用于抵消之前导入的同一批数据。 该参数仅适用于存在 value 列,并且 value 列的聚合类型仅为 SUM 的情况。 - + column_separator: 用于指定导入文件中的列分隔符。默认为 \t 如果是不可见字符,则需要加\\x作为前缀,使用十六进制来表示分隔符。 如hive文件的分隔符\x01,指定为"\\x01" - + file_type: - 用于指定导入文件的类型,例如:parquet、csv。默认值通过文件后缀名判断。 - + 用于指定导入文件的类型,例如:parquet、csv。默认值通过文件后缀名判断。 + column_list: 用于指定导入文件中的列和 table 中的列的对应关系。 @@ -88,7 +88,7 @@ 用于指定需要从文件路径中解析的字段。 语法: (col_from_path_name1, col_from_path_name2, ...) - + SET: 如果指定此参数,可以将源文件某一列按照函数进行转化,然后将转化后的结果导入到table中。 @@ -114,19 +114,19 @@ default_value(value) 设置某一列导入的默认值 不指定则使用建表时列的默认值 - md5sum(column1, column2, ...) 将指定的导入列的值求md5sum,返回32位16进制字符串 + md5sum(column1, column2, ...) 将指定的导入列的值求md5sum,返回32位16进制字符串 replace_value(old_value[, new_value]) 将导入文件中指定的old_value替换为new_value new_value如不指定则使用建表时列的默认值 - + hll_hash(column) 用于将表或数据里面的某一列转化成HLL列的数据结构 - + 3. opt_properties 用于指定一些特殊参数。 语法: [PROPERTIES ("key"="value", ...)] - + 可以指定如下参数: cluster: 导入所使用的 Hadoop 计算队列。 timeout: 指定导入操作的超时时间。默认超时为3天。单位秒。 @@ -160,7 +160,7 @@ ); 其中 hdfs_host 为 namenode 的 host,hdfs_port 为 fs.defaultFS 端口(默认9000) - + 2. 导入一批数据,包含多个文件。导入不同的 table,指定分隔符,指定列对应关系 LOAD LABEL example_db.label2 @@ -184,7 +184,7 @@ INTO TABLE `my_table` COLUMNS TERMINATED BY "\\x01" ); - + 4. 导入一批“负”数据 LOAD LABEL example_db.label4 @@ -241,13 +241,13 @@ SET ( k1 = strftime("%Y-%m-%d %H:%M:%S", tmp_k1), k2 = time_format("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", tmp_k2), - k3 = alignment_timestamp("day", tmp_k3), - k4 = default_value("1"), + k3 = alignment_timestamp("day", tmp_k3), + k4 = default_value("1"), k5 = md5sum(tmp_k1, tmp_k2, tmp_k3), k6 = replace_value("-", "10") ) ); - + 7. 导入数据到含有HLL列的表,可以是表中的列或者数据里面的列 LOAD LABEL example_db.label7 @@ -284,8 +284,8 @@ FORMAT AS "parquet" (k1, k2, k3) ) - WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); - + WITH BROKER hdfs ("username"="hdfs_user", "password"="hdfs_password"); + 9. 提取文件路径中的压缩字段 如果需要,则会根据表中定义的字段类型解析文件路径中的压缩字段(partitioned fields),类似Spark中Partition Discovery的功能 LOAD LABEL example_db.label10 @@ -304,4 +304,4 @@ ## keyword LOAD - + diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 62a5df13f197fc..1bdabbb4e61bd3 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -41,7 +41,7 @@ enum TPlanNodeType { OLAP_REWRITE_NODE, KUDU_SCAN_NODE, BROKER_SCAN_NODE, - EMPTY_SET_NODE, + EMPTY_SET_NODE, UNION_NODE, ES_SCAN_NODE, ES_HTTP_SCAN_NODE @@ -122,8 +122,8 @@ struct TBrokerScanRangeParams { 1: required byte column_separator; 2: required byte line_delimiter; - // We construct one line in file to a tuple. And each field of line - // correspond to a slot in this tuple. + // We construct one line in file to a tuple. And each field of line + // correspond to a slot in this tuple. // src_tuple_id is the tuple id of the input file 3: required Types.TTupleId src_tuple_id // src_slot_ids is the slot_ids of the input file @@ -141,7 +141,7 @@ struct TBrokerScanRangeParams { // If partition_ids is set, data that doesn't in this partition will be filtered. 8: optional list partition_ids - + // This is the mapping of dest slot id and src slot id in load expr // It excludes the slot id which has the transform expr 9: optional map dest_sid_to_src_sid_without_trans @@ -162,7 +162,7 @@ struct TEsScanRange { 1: required list es_hosts // es hosts is used by be scan node to connect to es // has to set index and type here, could not set it in scannode // because on scan node maybe scan an es alias then it contains one or more indices - 2: required string index + 2: required string index 3: optional string type 4: required i32 shard_id } @@ -324,7 +324,7 @@ enum TAggregationOp { DENSE_RANK, ROW_NUMBER, LAG, - HLL_C, + HLL_C, } //struct TAggregateFunctionCall { @@ -383,16 +383,16 @@ struct TSortNode { 3: optional i64 offset // TODO(lingbin): remove blew, because duplaicate with TSortInfo - 4: optional list ordering_exprs - 5: optional list is_asc_order - // Indicates whether the imposed limit comes DEFAULT_ORDER_BY_LIMIT. - 6: optional bool is_default_limit - // Indicates, for each expr, if nulls should be listed first or last. This is - // independent of is_asc_order. - 7: optional list nulls_first + 4: optional list ordering_exprs + 5: optional list is_asc_order + // Indicates whether the imposed limit comes DEFAULT_ORDER_BY_LIMIT. + 6: optional bool is_default_limit + // Indicates, for each expr, if nulls should be listed first or last. This is + // independent of is_asc_order. + 7: optional list nulls_first // Expressions evaluated over the input row that materialize the tuple to be so - // Contains one expr per slot in the materialized tuple. - 8: optional list sort_tuple_slot_exprs + // Contains one expr per slot in the materialized tuple. + 8: optional list sort_tuple_slot_exprs } enum TAnalyticWindowType { @@ -533,7 +533,7 @@ struct TBackendResourceProfile { // The maximum reservation for this plan node in bytes. MAX_INT64 means effectively // unlimited. -2: required i64 max_reservation = 12188490189880; // no max reservation limit +2: required i64 max_reservation = 12188490189880; // no max reservation limit // The spillable buffer size in bytes to use for this node, chosen by the planner. // Set iff the node uses spillable buffers. @@ -568,9 +568,9 @@ struct TPlanNode { 14: optional TMergeNode merge_node 15: optional TExchangeNode exchange_node 17: optional TMySQLScanNode mysql_scan_node - 18: optional TOlapScanNode olap_scan_node - 19: optional TCsvScanNode csv_scan_node - 20: optional TBrokerScanNode broker_scan_node + 18: optional TOlapScanNode olap_scan_node + 19: optional TCsvScanNode csv_scan_node + 20: optional TBrokerScanNode broker_scan_node 21: optional TPreAggregationNode pre_agg_node 22: optional TSchemaScanNode schema_scan_node 23: optional TMergeJoinNode merge_join_node From 030c1dd07026cdc4772583cddf9fd90c71de4ef8 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Wed, 14 Aug 2019 19:01:37 +0800 Subject: [PATCH 08/14] Enable parsing columns from file path for Broker Load --- be/src/exec/base_scanner.cpp | 14 ++++++++ be/src/exec/base_scanner.h | 2 ++ be/src/exec/broker_scanner.cpp | 32 +++++-------------- be/src/exec/broker_scanner.h | 2 -- be/src/exec/parquet_reader.cpp | 28 ++++------------ be/src/exec/parquet_reader.h | 1 - be/src/exec/parquet_scanner.cpp | 2 ++ .../apache/doris/planner/BrokerScanNode.java | 27 +++++++++------- gensrc/thrift/PlanNodes.thrift | 6 ++-- 9 files changed, 52 insertions(+), 62 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 737c196736c5c3..440d88f6b38331 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -179,4 +179,18 @@ bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* } return true; } + +void BaseScanner::fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path) { + // values of columns from path can not be null + for (int i = start; i < _src_slot_descs.size(); ++i) { + auto slot_desc = _src_slot_descs[i]; + _src_tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast(slot); + const std::string& column_from_path = columns_from_path[i - start]; + str_slot->ptr = const_cast(column_from_path.c_str()); + str_slot->len = column_from_path.size(); + } +} + } diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 8078c36c6e8f0c..884df553b0103f 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -64,6 +64,8 @@ class BaseScanner { virtual void close() = 0; bool fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* mem_pool); + void fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path); + protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 1d21d53e99ea86..23e70ddfafb591 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -54,7 +54,6 @@ BrokerScanner::BrokerScanner(RuntimeState* state, _cur_decompressor(nullptr), _next_range(0), _cur_line_reader_eof(false), - _columns_from_path(), _scanner_eof(false), _skip_next_line(false) { } @@ -237,9 +236,6 @@ Status BrokerScanner::open_line_reader() { // _decompressor may be NULL if this is not a compressed file RETURN_IF_ERROR(create_decompressor(range.format_type)); - // set columns parsed from this file path - _columns_from_path = range.columns_from_path; - // open line reader switch (range.format_type) { case TFileFormatType::FORMAT_CSV_PLAIN: @@ -468,19 +464,6 @@ inline void BrokerScanner::fill_slot(SlotDescriptor* slot_desc, const Slice& val str_slot->len = value.size; } -inline void BrokerScanner::fill_slots_of_columns_from_path(int start, const std::vector& src_slot_descs, Tuple* tuple) { - // values of columns from path can not be null - for (int i = start; i < src_slot_descs.size(); ++i) { - auto slot_desc = src_slot_descs[i]; - tuple->set_not_null(slot_desc->null_indicator_offset()); - void* slot = tuple->get_slot(slot_desc->tuple_offset()); - StringValue* str_slot = reinterpret_cast(slot); - const std::string& column_from_path = _columns_from_path[i - start]; - str_slot->ptr = const_cast(column_from_path.c_str()); - str_slot->len = column_from_path.size(); - } -} - // Convert one row to this tuple bool BrokerScanner::line_to_src_tuple(const Slice& line) { @@ -498,7 +481,9 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { split_line(line, &values); } - if (values.size() + _columns_from_path.size() < _src_slot_descs.size()) { + const TBrokerRangeDesc& range = _ranges[_next_range]; + const std::vector& columns_from_path = range.columns_from_path; + if (values.size() + columns_from_path.size() < _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is less than schema column number. " << "actual number: " << values.size() << " sep: " << _value_separator << ", " @@ -507,7 +492,7 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { error_msg.str()); _counter->num_rows_filtered++; return false; - } else if (values.size() + _columns_from_path.size() > _src_slot_descs.size()) { + } else if (values.size() + columns_from_path.size() > _src_slot_descs.size()) { std::stringstream error_msg; error_msg << "actual column number is more than schema column number. " << "actual number: " << values.size() << " sep: " << _value_separator << ", " @@ -518,14 +503,13 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { return false; } - int file_column_index = 0; - for (; file_column_index < values.size(); ++file_column_index) { - auto slot_desc = _src_slot_descs[file_column_index]; - const Slice& value = values[file_column_index]; + for (int i = 0; i < values.size(); ++i) { + auto slot_desc = _src_slot_descs[i]; + const Slice& value = values[i]; fill_slot(slot_desc, value); } - fill_slots_of_columns_from_path(file_column_index, _src_slot_descs, _src_tuple); + fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path); return true; } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 5c5118568334a4..30c4db0bed911d 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -105,7 +105,6 @@ class BrokerScanner : public BaseScanner { Status line_to_src_tuple(); void fill_slot(SlotDescriptor* slot_desc, const Slice& value); - void fill_slots_of_columns_from_path(int start, const std::vector& src_slot_descs, Tuple* tuple); bool line_to_src_tuple(const Slice& line); private:; const std::vector& _ranges; @@ -122,7 +121,6 @@ private:; Decompressor* _cur_decompressor; int _next_range; bool _cur_line_reader_eof; - std::vector _columns_from_path; bool _scanner_eof; diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index bf9e17d48891dc..b3a2e388db0974 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -203,19 +203,6 @@ Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr& src_slot_descs, Tuple* tuple) { - // values of columns from path can not be null - for (int i = start; i < src_slot_descs.size(); ++i) { - auto slot_desc = src_slot_descs[i]; - tuple->set_not_null(slot_desc->null_indicator_offset()); - void* slot = tuple->get_slot(slot_desc->tuple_offset()); - StringValue* str_slot = reinterpret_cast(slot); - const std::string& column_from_path = _columns_from_path[i - start]; - str_slot->ptr = const_cast(column_from_path.c_str()); - str_slot->len = column_from_path.size(); - } -} - Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& tuple_slot_descs, MemPool* mem_pool, bool* eof) { uint8_t tmp_buf[128] = {0}; int32_t wbytes = 0; @@ -418,10 +405,10 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& RETURN_IF_ERROR(set_field_null(tuple, slot_desc)); } else { time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) * 24 * 60 * 60); - tm* local; - local = localtime(×tamp); + struct tm local; + localtime_r(×tamp, &local); char* to = reinterpret_cast(&tmp_buf); - wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d", local); + wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d", &local); fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; @@ -433,10 +420,10 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& } else { // convert milliseconds to seconds time_t timestamp = (time_t)((int64_t)ts_array->Value(_current_line_of_group) / 1000); - tm* local; - local = localtime(×tamp); + struct tm local; + localtime_r(×tamp, &local); char* to = reinterpret_cast(&tmp_buf); - wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d %H:%M:%S", local); + wbytes = (uint32_t)strftime(to, 64, "%Y-%m-%d %H:%M:%S", &local); fill_slot(tuple, slot_desc, mem_pool, tmp_buf, wbytes); } break; @@ -460,9 +447,6 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& return Status::InternalError(str_error.str()); } - ++column_index; - fill_slots_of_columns_from_path(column_index, tuple_slot_descs, tuple); - // update data value ++_current_line_of_group; return read_record_batch(tuple_slot_descs, eof); diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index 80162902c8e046..5e9726feaea5b5 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -83,7 +83,6 @@ class ParquetReaderWrap { Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc); Status read_record_batch(const std::vector& tuple_slot_descs, bool* eof); Status handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbtyes); - void fill_slots_of_columns_from_path(int start, const std::vector& src_slot_descs, Tuple* tuple); private: const std::vector& _columns_from_path; diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 0d4bcfdfeffcd3..a2ef35bbbd02f4 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -70,6 +70,8 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { _cur_file_eof = false; } RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + const TBrokerRangeDesc& range = _ranges[_next_range]; + fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path); { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index b9cd7643bf537b..103ba384cf5dcc 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -637,32 +637,34 @@ private TFileFormatType formatType(String fileFormat, String path) { // If fileFormat is not null, we use fileFormat instead of check file's suffix private void processFileGroup( - BrokerFileGroup fileGroup, - TBrokerScanRangeParams params, + ParamCreateContext context, List fileStatuses) throws UserException { if (fileStatuses == null || fileStatuses.isEmpty()) { return; } - TScanRangeLocations curLocations = newLocations(params, brokerDesc.getName()); + TScanRangeLocations curLocations = newLocations(context.params, brokerDesc.getName()); long curInstanceBytes = 0; long curFileOffset = 0; for (int i = 0; i < fileStatuses.size(); ) { TBrokerFileStatus fileStatus = fileStatuses.get(i); long leftBytes = fileStatus.size - curFileOffset; long tmpBytes = curInstanceBytes + leftBytes; - TFileFormatType formatType = formatType(fileGroup.getFileFormat(), fileStatus.path); - List columnsFromPath = parseColumnsFromPath(fileStatus.path, fileGroup.getColumnsFromPath()); + TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); + List columnsFromPath = parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnsFromPath()); + int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size(); if (tmpBytes > bytesPerInstance) { // Now only support split plain text if (formatType == TFileFormatType.FORMAT_CSV_PLAIN && fileStatus.isSplitable) { long rangeBytes = bytesPerInstance - curInstanceBytes; - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, rangeBytes, columnsFromPath); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, + rangeBytes, columnsFromPath, numberOfColumnsFromFile); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset += rangeBytes; } else { - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes, columnsFromPath); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, + leftBytes, columnsFromPath, numberOfColumnsFromFile); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; i++; @@ -670,11 +672,12 @@ private void processFileGroup( // New one scan locationsList.add(curLocations); - curLocations = newLocations(params, brokerDesc.getName()); + curLocations = newLocations(context.params, brokerDesc.getName()); curInstanceBytes = 0; } else { - TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, leftBytes, columnsFromPath); + TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType, + leftBytes, columnsFromPath, numberOfColumnsFromFile); brokerScanRange(curLocations).addToRanges(rangeDesc); curFileOffset = 0; curInstanceBytes += leftBytes; @@ -689,7 +692,8 @@ private void processFileGroup( } private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, - TFileFormatType formatType, long rangeBytes, List columnsFromPath) { + TFileFormatType formatType, long rangeBytes, + List columnsFromPath, int numberOfColumnsFromFile) { TBrokerRangeDesc rangeDesc = new TBrokerRangeDesc(); rangeDesc.setFile_type(TFileType.FILE_BROKER); rangeDesc.setFormat_type(formatType); @@ -698,6 +702,7 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt rangeDesc.setStart_offset(curFileOffset); rangeDesc.setSize(rangeBytes); rangeDesc.setFile_size(fileStatus.size); + rangeDesc.setNum_of_columns_from_file(numberOfColumnsFromFile); rangeDesc.setColumns_from_path(columnsFromPath); return rangeDesc; } @@ -752,7 +757,7 @@ public void finalize(Analyzer analyzer) throws UserException { } catch (AnalysisException e) { throw new UserException(e.getMessage()); } - processFileGroup(context.fileGroup, context.params, fileStatuses); + processFileGroup(context, fileStatuses); } if (LOG.isDebugEnabled()) { for (TScanRangeLocations locations : locationsList) { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1bdabbb4e61bd3..22c4091260f84f 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -114,8 +114,10 @@ struct TBrokerRangeDesc { 7: optional Types.TUniqueId load_id // total size of the file 8: optional i64 file_size - // columns parsed from file path - 9: optional list columns_from_path + // number of columns from file + 9: optional i32 num_of_columns_from_file + // columns parsed from file path should be after the columns read from file + 10: optional list columns_from_path } struct TBrokerScanRangeParams { From 2ffaac0054554e53b622f3c38def38ebe4d3ef97 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Thu, 15 Aug 2019 00:38:54 +0800 Subject: [PATCH 09/14] Enable parsing columns from file path for Broker Load --- be/src/exec/broker_scanner.cpp | 25 +++++++++++-------------- be/src/exec/broker_scanner.h | 1 - be/src/exec/parquet_scanner.cpp | 3 ++- gensrc/thrift/PlanNodes.thrift | 2 +- 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 23e70ddfafb591..2fc7ff42294070 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -452,18 +452,6 @@ bool BrokerScanner::convert_one_row( return fill_dest_tuple(line, tuple, tuple_pool); } -inline void BrokerScanner::fill_slot(SlotDescriptor* slot_desc, const Slice& value) { - if (slot_desc->is_nullable() && is_null(value)) { - _src_tuple->set_null(slot_desc->null_indicator_offset()); - return; - } - _src_tuple->set_not_null(slot_desc->null_indicator_offset()); - void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); - StringValue* str_slot = reinterpret_cast(slot); - str_slot->ptr = value.data; - str_slot->len = value.size; -} - // Convert one row to this tuple bool BrokerScanner::line_to_src_tuple(const Slice& line) { @@ -481,7 +469,8 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { split_line(line, &values); } - const TBrokerRangeDesc& range = _ranges[_next_range]; + // range of current file + const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); const std::vector& columns_from_path = range.columns_from_path; if (values.size() + columns_from_path.size() < _src_slot_descs.size()) { std::stringstream error_msg; @@ -506,7 +495,15 @@ bool BrokerScanner::line_to_src_tuple(const Slice& line) { for (int i = 0; i < values.size(); ++i) { auto slot_desc = _src_slot_descs[i]; const Slice& value = values[i]; - fill_slot(slot_desc, value); + if (slot_desc->is_nullable() && is_null(value)) { + _src_tuple->set_null(slot_desc->null_indicator_offset()); + continue; + } + _src_tuple->set_not_null(slot_desc->null_indicator_offset()); + void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); + StringValue* str_slot = reinterpret_cast(slot); + str_slot->ptr = value.data; + str_slot->len = value.size; } fill_slots_of_columns_from_path(range.num_of_columns_from_file, columns_from_path); diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 30c4db0bed911d..9fa8c824c072c9 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -104,7 +104,6 @@ class BrokerScanner : public BaseScanner { //Status init_expr_ctxes(); Status line_to_src_tuple(); - void fill_slot(SlotDescriptor* slot_desc, const Slice& value); bool line_to_src_tuple(const Slice& line); private:; const std::vector& _ranges; diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index a2ef35bbbd02f4..b4f8d470ca1217 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -70,7 +70,8 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { _cur_file_eof = false; } RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); - const TBrokerRangeDesc& range = _ranges[_next_range]; + // range of current file + const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); fill_slots_of_columns_from_path(range.num_of_columns_from_file, range.columns_from_path); { COUNTER_UPDATE(_rows_read_counter, 1); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 22c4091260f84f..6c0d5541e0caeb 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -115,7 +115,7 @@ struct TBrokerRangeDesc { // total size of the file 8: optional i64 file_size // number of columns from file - 9: optional i32 num_of_columns_from_file + 9: optional i32 num_of_columns_from_file = 0 // columns parsed from file path should be after the columns read from file 10: optional list columns_from_path } From 7176ed4e7047777101fa08af9e657ca43c23930f Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Thu, 15 Aug 2019 14:56:40 +0800 Subject: [PATCH 10/14] Fix ut --- be/src/exec/base_scanner.cpp | 3 +++ be/test/exec/broker_scan_node_test.cpp | 2 ++ 2 files changed, 5 insertions(+) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 440d88f6b38331..330242d398dff2 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -181,6 +181,9 @@ bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* } void BaseScanner::fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path) { + if (start <= 0) { + return; + } // values of columns from path can not be null for (int i = start; i < _src_slot_descs.size(); ++i) { auto slot_desc = _src_slot_descs[i]; diff --git a/be/test/exec/broker_scan_node_test.cpp b/be/test/exec/broker_scan_node_test.cpp index a83a4b1af71a58..44f302bf43aed8 100644 --- a/be/test/exec/broker_scan_node_test.cpp +++ b/be/test/exec/broker_scan_node_test.cpp @@ -422,6 +422,7 @@ TEST_F(BrokerScanNodeTest, normal) { range.splittable = true; std::vector columns_from_path{"1"}; range.__set_columns_from_path(columns_from_path); + range.__set_num_of_columns_from_file(3); broker_scan_range.ranges.push_back(range); scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); @@ -443,6 +444,7 @@ TEST_F(BrokerScanNodeTest, normal) { range.splittable = true; std::vector columns_from_path{"2"}; range.__set_columns_from_path(columns_from_path); + range.__set_num_of_columns_from_file(3); broker_scan_range.ranges.push_back(range); scan_range_params.scan_range.__set_broker_scan_range(broker_scan_range); From 8b5061bfa1d9a5abcf67975cd7fa0ccd5c5bd72a Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Thu, 15 Aug 2019 18:46:45 +0800 Subject: [PATCH 11/14] Add ut --- be/src/exec/parquet_reader.cpp | 17 ++- be/src/exec/parquet_reader.h | 4 +- be/src/exec/parquet_scanner.cpp | 2 +- .../apache/doris/common/util/BrokerUtil.java | 40 +++++++ .../apache/doris/planner/BrokerScanNode.java | 38 +------ .../doris/common/util/BrokerUtilTest.java | 107 ++++++++++++++++++ 6 files changed, 160 insertions(+), 48 deletions(-) create mode 100644 fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index b3a2e388db0974..69181dd5975184 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -34,8 +34,8 @@ namespace doris { // Broker -ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, const std::vector& columns_from_path) : - _columns_from_path(columns_from_path), _total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) { +ParquetReaderWrap::ParquetReaderWrap(FileReader *file_reader, int32_t num_of_columns_from_file) : + _num_of_columns_from_file(num_of_columns_from_file), _total_groups(0), _current_group(0), _rows_of_group(0), _current_line_of_group(0) { _parquet = std::shared_ptr(new ParquetFile(file_reader)); _properties = parquet::ReaderProperties(); _properties.enable_buffered_stream(); @@ -122,18 +122,17 @@ inline void ParquetReaderWrap::fill_slot(Tuple* tuple, SlotDescriptor* slot_desc Status ParquetReaderWrap::column_indices(const std::vector& tuple_slot_descs) { _parquet_column_ids.clear(); - for (auto slot_desc : tuple_slot_descs) { + for (int i = 0; i < _num_of_columns_from_file; i++) { + auto slot_desc = tuple_slot_descs.at(i); // Get the Column Reader for the boolean column auto iter = _map_column.find(slot_desc->col_name()); if (iter != _map_column.end()) { _parquet_column_ids.emplace_back(iter->second); } else { - if (std::find(_columns_from_path.begin(), _columns_from_path.end(), slot_desc->col_name()) != _columns_from_path.end()) { - std::stringstream str_error; - str_error << "Invalid Column Name:" << slot_desc->col_name(); - LOG(WARNING) << str_error.str(); - return Status::InvalidArgument(str_error.str()); - } + std::stringstream str_error; + str_error << "Invalid Column Name:" << slot_desc->col_name(); + LOG(WARNING) << str_error.str(); + return Status::InvalidArgument(str_error.str()); } } return Status::OK(); diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index 5e9726feaea5b5..2106c74ff937bf 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -68,7 +68,7 @@ class ParquetFile : public arrow::io::RandomAccessFile { // Reader of broker parquet file class ParquetReaderWrap { public: - ParquetReaderWrap(FileReader *file_reader, const std::vector& columns_from_path); + ParquetReaderWrap(FileReader *file_reader, int32_t num_of_columns_from_file); virtual ~ParquetReaderWrap(); // Read @@ -85,7 +85,7 @@ class ParquetReaderWrap { Status handle_timestamp(const std::shared_ptr& ts_array, uint8_t *buf, int32_t *wbtyes); private: - const std::vector& _columns_from_path; + const int32_t _num_of_columns_from_file; parquet::ReaderProperties _properties; std::shared_ptr _parquet; diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index b4f8d470ca1217..b0e385560f76db 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -144,7 +144,7 @@ Status ParquetScanner::open_next_reader() { file_reader->close(); continue; } - _cur_file_reader = new ParquetReaderWrap(file_reader.release(), range.columns_from_path); + _cur_file_reader = new ParquetReaderWrap(file_reader.release(), range.num_of_columns_from_file); Status status = _cur_file_reader->init_parquet_reader(_src_slot_descs); if (status.is_end_of_file()) { continue; diff --git a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java index a6cb66832c5369..bae07676e3cf45 100644 --- a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -17,6 +17,7 @@ package org.apache.doris.common.util; +import com.google.common.collect.Lists; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.FsBroker; @@ -36,6 +37,7 @@ import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import java.util.Collections; import java.util.List; public class BrokerUtil { @@ -99,4 +101,42 @@ public static String printBroker(String brokerName, TNetworkAddress address) { return brokerName + "[" + address.toString() + "]"; } + public static List parseColumnsFromPath(String filePath, List columnsFromPath) throws UserException { + if (columnsFromPath == null || columnsFromPath.isEmpty()) { + return Collections.emptyList(); + } + String[] strings = filePath.split("/"); + if (strings.length < 2) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + String[] columns = new String[columnsFromPath.size()]; + int size = 0; + for (int i = strings.length - 2; i >= 0; i--) { + String str = strings[i]; + if (str != null && str.isEmpty()) { + continue; + } + if (str == null || !str.contains("=")) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + String[] pair = str.split("=", 2); + if (pair.length != 2) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + int index = columnsFromPath.indexOf(pair[0]); + if (index == -1) { + continue; + } + columns[index] = pair[1]; + size++; + if (size >= columnsFromPath.size()) { + break; + } + } + if (size != columnsFromPath.size()) { + throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); + } + return Lists.newArrayList(columns); + } + } diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 103ba384cf5dcc..d3a7c45d5be605 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -652,7 +652,8 @@ private void processFileGroup( long leftBytes = fileStatus.size - curFileOffset; long tmpBytes = curInstanceBytes + leftBytes; TFileFormatType formatType = formatType(context.fileGroup.getFileFormat(), fileStatus.path); - List columnsFromPath = parseColumnsFromPath(fileStatus.path, context.fileGroup.getColumnsFromPath()); + List columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path, + context.fileGroup.getColumnsFromPath()); int numberOfColumnsFromFile = context.slotDescByName.size() - columnsFromPath.size(); if (tmpBytes > bytesPerInstance) { // Now only support split plain text @@ -707,41 +708,6 @@ private TBrokerRangeDesc createBrokerRangeDesc(long curFileOffset, TBrokerFileSt return rangeDesc; } - private List parseColumnsFromPath(String filePath, List columnsFromPath) throws UserException { - if (columnsFromPath == null || columnsFromPath.isEmpty()) { - return Collections.emptyList(); - } - String[] strings = filePath.split("/"); - if (strings.length < 2) { - throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); - } - String[] columns = new String[columnsFromPath.size()]; - int size = 0; - for (int i = strings.length - 2; i >= 0; i--) { - String str = strings[i]; - if (str == null || str.isEmpty() || !str.contains("=")) { - throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); - } - String[] pair = str.split("="); - if (pair.length != 2) { - throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); - } - int index = columnsFromPath.indexOf(pair[0]); - if (index == -1) { - continue; - } - columns[index] = pair[1]; - size++; - if (size >= columnsFromPath.size()) { - break; - } - } - if (size != columnsFromPath.size()) { - throw new UserException("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath); - } - return Lists.newArrayList(columns); - } - @Override public void finalize(Analyzer analyzer) throws UserException { locationsList = Lists.newArrayList(); diff --git a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java new file mode 100644 index 00000000000000..012507acb57770 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import com.google.common.collect.Lists; +import org.apache.doris.common.UserException; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +public class BrokerUtilTest { + + @Test + public void parseColumnsFromPath() { + String path = "/path/to/dir/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Collections.singletonList("k1")); + assertEquals(1, columns.size()); + assertEquals(Collections.singletonList("v1"), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Collections.singletonList("k1")); + fail(); + } catch (UserException ignored) { + } + + path = "/path/to/dir/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Collections.singletonList("k2")); + fail(); + } catch (UserException ignored) { + } + + path = "/path/to/dir/k1=v2/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Collections.singletonList("k1")); + assertEquals(1, columns.size()); + assertEquals(Collections.singletonList("v1"), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k2=v2/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + assertEquals(2, columns.size()); + assertEquals(Lists.newArrayList("v1", "v2"), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k2=v2/a/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + fail(); + } catch (UserException ignored) { + } + + path = "/path/to/dir/k2=v2/k1=v1/xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2", "k3")); + fail(); + } catch (UserException ignored) { + } + + path = "/path/to/dir/k2=v2//k1=v1//xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + assertEquals(2, columns.size()); + assertEquals(Lists.newArrayList("v1", "v2"), columns); + } catch (UserException e) { + fail(); + } + + path = "/path/to/dir/k2==v2=//k1=v1//xxx.csv"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + assertEquals(2, columns.size()); + assertEquals(Lists.newArrayList("v1", "=v2="), columns); + } catch (UserException e) { + fail(); + } + + } +} \ No newline at end of file From 7c7c54c37ff1cbd467560712bc4f718c13033245 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Thu, 15 Aug 2019 19:21:02 +0800 Subject: [PATCH 12/14] Fix ut --- be/test/exec/parquet_scanner_test.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/test/exec/parquet_scanner_test.cpp b/be/test/exec/parquet_scanner_test.cpp index a6d1911de02d79..81ff2e1c0588bf 100644 --- a/be/test/exec/parquet_scanner_test.cpp +++ b/be/test/exec/parquet_scanner_test.cpp @@ -438,6 +438,7 @@ TEST_F(ParquetSannerTest, normal) { std::vector columns_from_path{"value"}; range.__set_columns_from_path(columns_from_path); + range.__set_num_of_columns_from_file(19); #if 1 range.path = "./be/test/exec/test_data/parquet_scanner/localfile.parquet"; range.file_type = TFileType::FILE_LOCAL; From 067b4c55e8e538422d78a38d8a468f5fe7536c79 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Sun, 18 Aug 2019 23:55:02 +0800 Subject: [PATCH 13/14] Remove redundant persistence --- be/src/exec/base_scanner.cpp | 9 +++---- .../doris/analysis/DataDescription.java | 9 ++++--- .../apache/doris/load/BrokerFileGroup.java | 24 ------------------- .../main/java/org/apache/doris/load/Load.java | 6 ++--- .../doris/common/util/BrokerUtilTest.java | 9 ++++++- 5 files changed, 20 insertions(+), 37 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 330242d398dff2..7b770258fb2b97 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -181,16 +181,13 @@ bool BaseScanner::fill_dest_tuple(const Slice& line, Tuple* dest_tuple, MemPool* } void BaseScanner::fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path) { - if (start <= 0) { - return; - } // values of columns from path can not be null - for (int i = start; i < _src_slot_descs.size(); ++i) { - auto slot_desc = _src_slot_descs[i]; + for (int i = 0; i < columns_from_path.size(); ++i) { + auto slot_desc = _src_slot_descs.at(i + start); _src_tuple->set_not_null(slot_desc->null_indicator_offset()); void* slot = _src_tuple->get_slot(slot_desc->tuple_offset()); StringValue* str_slot = reinterpret_cast(slot); - const std::string& column_from_path = columns_from_path[i - start]; + const std::string& column_from_path = columns_from_path[i]; str_slot->ptr = const_cast(column_from_path.c_str()); str_slot->len = column_from_path.size(); } diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index 45da205c4771ca..c75809be124636 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -215,12 +215,15 @@ public boolean isHadoopLoad() { * "col2": "tmp_col2+1", "col3": "strftime("%Y-%m-%d %H:%M:%S", tmp_col3)"} */ private void analyzeColumns() throws AnalysisException { + if (columns == null && columnsFromPath != null) { + throw new AnalysisException("Can not specify columns_from_path without column_list"); + } List columnList = Lists.newArrayList(); if (columns != null) { columnList.addAll(columns); - } - if (columnsFromPath != null) { - columnList.addAll(columnsFromPath); + if (columnsFromPath != null) { + columnList.addAll(columnsFromPath); + } } if (columnList.isEmpty()) { return; diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 13f92d5e4e2b2d..bf69f4a76cdaed 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -290,15 +290,6 @@ public void write(DataOutput out) throws IOException { out.writeBoolean(true); Text.writeString(out, fileFormat); } - // columnsFromPath - if (columnsFromPath == null) { - out.writeInt(0); - } else { - out.writeInt(columnsFromPath.size()); - for (String name : columnsFromPath) { - Text.writeString(out, name); - } - } } @Override @@ -352,16 +343,6 @@ public void readFields(DataInput in) throws IOException { fileFormat = Text.readString(in); } } - // columnsFromPath - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_59) { - int fileFieldNameSize = in.readInt(); - if (fileFieldNameSize > 0) { - columnsFromPath = Lists.newArrayList(); - for (int i = 0; i < fileFieldNameSize; ++i) { - columnsFromPath.add(Text.readString(in)); - } - } - } // There are no columnExprList in the previous load job which is created before function is supported. // The columnExprList could not be analyzed without origin stmt in the previous load job. @@ -374,11 +355,6 @@ public void readFields(DataInput in) throws IOException { for (String columnName : fileFieldNames) { columnExprList.add(new ImportColumnDesc(columnName, null)); } - if (columnsFromPath != null && !columnsFromPath.isEmpty()) { - for (String columnName : columnsFromPath) { - columnExprList.add(new ImportColumnDesc(columnName, null)); - } - } if (exprColumnMap == null || exprColumnMap.isEmpty()) { return; } diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index dadb1ee1354a7f..186d8324ca5560 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -655,9 +655,9 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip List assignColumnNames = Lists.newArrayList(); if (dataDescription.getColumnNames() != null) { assignColumnNames.addAll(dataDescription.getColumnNames()); - } - if (dataDescription.getColumnsFromPath() != null) { - assignColumnNames.addAll(dataDescription.getColumnsFromPath()); + if (dataDescription.getColumnsFromPath() != null) { + assignColumnNames.addAll(dataDescription.getColumnsFromPath()); + } } if (assignColumnNames.isEmpty()) { // use table columns diff --git a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java index 012507acb57770..f8f1815142f21f 100644 --- a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java +++ b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java @@ -103,5 +103,12 @@ public void parseColumnsFromPath() { fail(); } + path = "/path/to/dir/k2==v2=//k1=v1/"; + try { + List columns = BrokerUtil.parseColumnsFromPath(path, Lists.newArrayList("k1", "k2")); + fail(); + } catch (UserException ignored) { + } + } -} \ No newline at end of file +} From 25c4bb0f8fd46598a81d67f9df839c1e04fabef5 Mon Sep 17 00:00:00 2001 From: yuanlihan Date: Mon, 19 Aug 2019 01:04:42 +0800 Subject: [PATCH 14/14] Remove redundant meta_version --- fe/src/main/java/org/apache/doris/common/FeConstants.java | 2 +- fe/src/main/java/org/apache/doris/common/FeMetaVersion.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 1c87c9926f49d9..95683e20ac3693 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -35,5 +35,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_59; + public static int meta_version = FeMetaVersion.VERSION_58; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 01482f74ca1d0a..b5bdb2a42d9edd 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -126,6 +126,4 @@ public final class FeMetaVersion { public static final int VERSION_57 = 57; // broker load support function, persist origin stmt in broker load public static final int VERSION_58 = 58; - // broker load support parsing columns from file path - public static final int VERSION_59 = 59; }