From 7d346e96688c4237bc7cf5076e148b2e97cf4405 Mon Sep 17 00:00:00 2001 From: Tao Yin <373141588@qq.com> Date: Tue, 26 Apr 2022 15:15:08 +0800 Subject: [PATCH 1/3] [feature-wip](parquet-vec) Support parquet scanner in vectorized engine --- be/src/exec/base_scanner.cpp | 17 + be/src/exec/base_scanner.h | 6 + be/src/exec/broker_scan_node.cpp | 13 +- be/src/exec/parquet_reader.cpp | 18 + be/src/exec/parquet_reader.h | 4 + be/src/exec/parquet_scanner.h | 4 +- be/src/http/action/stream_load.cpp | 2 + be/src/runtime/descriptors.cpp | 2 +- be/src/vec/CMakeLists.txt | 4 +- be/src/vec/exec/vparquet_scanner.cpp | 310 ++++++++++++++++++ be/src/vec/exec/vparquet_scanner.h | 69 ++++ be/src/vec/functions/function_cast.h | 3 +- .../utils/arrow_column_to_doris_column.cpp | 279 ++++++++++++++++ .../vec/utils/arrow_column_to_doris_column.h | 40 +++ .../org/apache/doris/task/StreamLoadTask.java | 3 + 15 files changed, 766 insertions(+), 8 deletions(-) create mode 100644 be/src/vec/exec/vparquet_scanner.cpp create mode 100644 be/src/vec/exec/vparquet_scanner.h create mode 100644 be/src/vec/utils/arrow_column_to_doris_column.cpp create mode 100644 be/src/vec/utils/arrow_column_to_doris_column.h diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 06331023e833c4..687c89e11416e1 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -110,6 +110,11 @@ Status BaseScanner::init_expr_ctxes() { Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs)); RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker)); RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state)); + // vec + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_vctxs)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_pre_filter_vctxs, _state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR(vectorized::VExpr::open(_pre_filter_vctxs, _state)); } // Construct dest slots information @@ -137,6 +142,12 @@ Status BaseScanner::init_expr_ctxes() { RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker)); RETURN_IF_ERROR(ctx->open(_state)); _dest_expr_ctx.emplace_back(ctx); + // vec + vectorized::VExprContext* vctx = nullptr; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &vctx)); + RETURN_IF_ERROR(vctx->prepare(_state, *_row_desc.get(), _mem_tracker)); + RETURN_IF_ERROR(vctx->open(_state)); + _dest_vexpr_ctxs.emplace_back(vctx); if (has_slot_id_map) { auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id()); if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) { @@ -272,6 +283,12 @@ void BaseScanner::close() { if (!_pre_filter_ctxs.empty()) { Expr::close(_pre_filter_ctxs, _state); } + if (!_pre_filter_vctxs.empty()) { + vectorized::VExpr::close(_pre_filter_vctxs, _state); + } + if (!_dest_vexpr_ctxs.empty()) { + vectorized::VExpr::close(_dest_vexpr_ctxs, _state); + } } } // namespace doris diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 7dc398c40a9aa7..0d729a772bb763 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -20,6 +20,7 @@ #include "common/status.h" #include "exprs/expr.h" +#include "vec/exprs/vexpr.h" #include "runtime/tuple.h" #include "util/runtime_profile.h" @@ -34,6 +35,7 @@ class RuntimeState; class ExprContext; namespace vectorized { +class VExprContext; class IColumn; using MutableColumnPtr = IColumn::MutablePtr; } @@ -94,6 +96,8 @@ class BaseScanner { // Dest tuple descriptor and dest expr context const TupleDescriptor* _dest_tuple_desc; std::vector _dest_expr_ctx; + // for vec + std::vector _dest_vexpr_ctxs; // the map values of dest slot id to src slot desc // if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr std::vector _src_slot_descs_order_by_dest; @@ -103,6 +107,8 @@ class BaseScanner { // and will be converted to `_pre_filter_ctxs` when scanner is open. const std::vector _pre_filter_texprs; std::vector _pre_filter_ctxs; + // for vec + std::vector _pre_filter_vctxs; bool _strict_mode; diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index ce450745a7e29a..ce642efc4cfb71 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -22,6 +22,7 @@ #include "common/object_pool.h" #include "vec/exec/vbroker_scanner.h" +#include "vec/exec/vparquet_scanner.h" #include "exec/json_scanner.h" #include "exec/orc_scanner.h" #include "exec/parquet_scanner.h" @@ -223,9 +224,15 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan BaseScanner* scan = nullptr; switch (scan_range.ranges[0].format_type) { case TFileFormatType::FORMAT_PARQUET: - scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, scan_range.broker_addresses, - _pre_filter_texprs, counter); + if (_vectorized) { + scan = new vectorized::VParquetScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, scan_range.broker_addresses, + _pre_filter_texprs, counter); + } else { + scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, scan_range.broker_addresses, + _pre_filter_texprs, counter); + } break; case TFileFormatType::FORMAT_ORC: scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params, diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp index ddb3531e176f7e..dbd63ec92e46ba 100644 --- a/be/src/exec/parquet_reader.cpp +++ b/be/src/exec/parquet_reader.cpp @@ -219,6 +219,24 @@ Status ParquetReaderWrap::read_record_batch(const std::vector& return Status::OK(); } +Status ParquetReaderWrap::next_batch(std::shared_ptr* batch, + const std::vector& tuple_slot_descs, + bool* eof) { + if (_batch->num_rows() == 0 || + _current_line_of_batch != 0 || + _current_line_of_group != 0) { + RETURN_IF_ERROR(read_record_batch(tuple_slot_descs, eof)); + } + *batch = get_batch(); + return Status::OK(); +} + +const std::shared_ptr& ParquetReaderWrap::get_batch() { + _current_line_of_batch += _batch->num_rows(); + _current_line_of_group += _batch->num_rows(); + return _batch; +} + Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr& ts_array, uint8_t* buf, int32_t* wbytes) { const auto type = std::static_pointer_cast(ts_array->type()); diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h index 2bd5b5a8021141..7bf3f235dfed77 100644 --- a/be/src/exec/parquet_reader.h +++ b/be/src/exec/parquet_reader.h @@ -79,6 +79,9 @@ class ParquetReaderWrap { Status size(int64_t* size); Status init_parquet_reader(const std::vector& tuple_slot_descs, const std::string& timezone); + Status next_batch(std::shared_ptr* batch, + const std::vector& tuple_slot_descs, + bool* eof); private: void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value, @@ -86,6 +89,7 @@ class ParquetReaderWrap { Status column_indices(const std::vector& tuple_slot_descs); Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc); Status read_record_batch(const std::vector& tuple_slot_descs, bool* eof); + const std::shared_ptr& get_batch(); Status handle_timestamp(const std::shared_ptr& ts_array, uint8_t* buf, int32_t* wbtyes); diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h index 1e54c05d9362df..a34e4a19e6b233 100644 --- a/be/src/exec/parquet_scanner.h +++ b/be/src/exec/parquet_scanner.h @@ -65,11 +65,11 @@ class ParquetScanner : public BaseScanner { // Close this scanner virtual void close(); -private: +protected: // Read next buffer from reader Status open_next_reader(); -private: +protected: //const TBrokerScanRangeParams& _params; const std::vector& _ranges; const std::vector& _broker_addresses; diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 65b0edcf3aaa3b..2706968b25b5c3 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -98,6 +98,8 @@ static TFileFormatType::type parse_format(const std::string& format_str, if (compress_type.empty()) { format_type = TFileFormatType::FORMAT_JSON; } + } else if (boost::iequals(format_str, "PARQUET")) { + format_type = TFileFormatType::FORMAT_PARQUET; } return format_type; } diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 0ed1e531c11a5a..5ea975164b70d9 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -99,7 +99,7 @@ vectorized::DataTypePtr SlotDescriptor::get_data_type_ptr() const { std::string SlotDescriptor::debug_string() const { std::stringstream out; - out << "Slot(id=" << _id << " type=" << _type << " col=" << _col_pos + out << "Slot(id=" << _id << " type=" << _type << " col=" << _col_pos << " col_name=" << _col_name << " offset=" << _tuple_offset << " null=" << _null_indicator_offset.debug_string() << ")"; return out.str(); } diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 5d1ca3035a6464..a5a6dfeb5888b9 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -100,6 +100,7 @@ set(VEC_FILES exec/vtable_function_node.cpp exec/vbroker_scan_node.cpp exec/vbroker_scanner.cpp + exec/vparquet_scanner.cpp exec/join/vhash_join_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp @@ -193,7 +194,8 @@ set(VEC_FILES runtime/vsorted_run_merger.cpp runtime/vload_channel.cpp runtime/vload_channel_mgr.cpp - runtime/vtablets_channel.cpp) + runtime/vtablets_channel.cpp + utils/arrow_column_to_doris_column.cpp) add_library(Vec STATIC ${VEC_FILES} diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp new file mode 100644 index 00000000000000..6fc7cb2a1f5eec --- /dev/null +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -0,0 +1,310 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vparquet_scanner.h" +#include "exec/parquet_reader.h" +#include "exprs/expr.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "vec/data_types/data_type_factory.hpp" +#include "vec/functions/simple_function_factory.h" +#include "vec/utils/arrow_column_to_doris_column.h" + +namespace doris::vectorized { + +VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), + _batch(nullptr), + _arrow_batch_cur_idx(0), + _num_of_columns_from_file(0) {} +VParquetScanner::~VParquetScanner() { +} + +Status VParquetScanner::open() { + RETURN_IF_ERROR(ParquetScanner::open()); + if (_ranges.empty()) { + return Status::OK(); + } + auto range = _ranges[0]; + _num_of_columns_from_file = range.__isset.num_of_columns_from_file + ? implicit_cast(range.num_of_columns_from_file) + : implicit_cast(_src_slot_descs.size()); + + // check consistency + if (range.__isset.num_of_columns_from_file) { + int size = range.columns_from_path.size(); + for (const auto& r : _ranges) { + if (r.columns_from_path.size() != size) { + return Status::InternalError("ranges have different number of columns."); + } + } + } + return Status::OK(); +} + +// get next available arrow batch +Status VParquetScanner::next_arrow_batch() { + _arrow_batch_cur_idx = 0; + // first, init file reader + if (_cur_file_reader == nullptr || _cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + _cur_file_eof = false; + } + // second, loop until find available arrow batch or EOF + while (!_scanner_eof) { + RETURN_IF_ERROR(_cur_file_reader->next_batch(&_batch, _src_slot_descs, &_cur_file_eof)); + if (_cur_file_eof) { + RETURN_IF_ERROR(open_next_reader()); + _cur_file_eof = false; + continue; + } + if (_batch->num_rows() == 0) { + continue; + } + return Status::OK(); + } + return Status::EndOfFile("EOF"); +} + +Status VParquetScanner::init_arrow_batch_if_necessary() { + // 1. init batch if first time + // 2. reset reader if end of file + if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { + while (!_scanner_eof) { + Status status = next_arrow_batch(); + if (_scanner_eof) { + return status; + } + if (status.is_end_of_file()) { + // try next file + continue; + } + return status; + } + } + return Status::OK(); +} + +Status VParquetScanner::init_src_block(Block* block) { + size_t batch_pos = 0; + for (auto i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto* array = _batch->column(batch_pos++).get(); + auto pt = arrow_type_to_pt(array->type()->id()); + if (pt == INVALID_TYPE) { + return Status::NotSupported(fmt::format( + "Not support arrow type:{}", array->type()->name())); + } + auto is_nullable = true; + // let src column be nullable for simplify converting + DataTypePtr data_type = DataTypeFactory::instance().create_data_type(pt, is_nullable); + MutableColumnPtr data_column = data_type->create_column(); + block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } + return Status::OK(); +} + +Status VParquetScanner::get_next(std::vector& columns, bool* eof) { + // overall of type converting: + // arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> + // primitive type(PT1) ==materialize_block==> dest primitive type + SCOPED_TIMER(_read_timer); + // init arrow batch + { + Status st = init_arrow_batch_if_necessary(); + if (!st.ok()) { + if (!st.is_end_of_file()) { + return st; + } + return Status::OK(); + } + } + Block src_block; + RETURN_IF_ERROR(init_src_block(&src_block)); + // convert arrow batch to block until reach the batch_size + while (!_scanner_eof) { + // cast arrow type to PT0 and append it to src block + // for example: arrow::Type::INT16 => TYPE_SMALLINT + RETURN_IF_ERROR(append_batch_to_src_block(&src_block)); + // finalize the src block if full + if (src_block.rows() >= _state->batch_size()) { + break; + } + auto status = next_arrow_batch(); + // if ok, append the batch to the src columns + if (status.ok()) { + continue; + } + // return error if not EOF + if (!status.is_end_of_file()) { + return status; + } + // if src block is not empty, then finalize the block + if (src_block.rows() > 0) { + break; + } + _cur_file_eof = true; + RETURN_IF_ERROR(next_arrow_batch()); + // there may be different arrow file, so reinit block here + RETURN_IF_ERROR(init_src_block(&src_block)); + } + COUNTER_UPDATE(_rows_read_counter, src_block.rows()); + SCOPED_TIMER(_materialize_timer); + // cast PT0 => PT1 + // for example: TYPE_SMALLINT => TYPE_VARCHAR + RETURN_IF_ERROR(cast_src_block(&src_block)); + // range of current file + fill_columns_from_path(&src_block); + RETURN_IF_ERROR(eval_conjunts(&src_block)); + // materialize, src block => dest columns + RETURN_IF_ERROR(materialize_block(&src_block, columns)); + if (_scanner_eof) { + *eof = true; + } else { + *eof = false; + } + return Status::OK(); +} + +// eval conjuncts, for example: t1 > 1 +Status VParquetScanner::eval_conjunts(Block* block) { + for (auto& vctx : _pre_filter_vctxs) { + size_t orig_rows = block->rows(); + RETURN_IF_ERROR( + VExprContext::filter_block(vctx, block, block->columns())); + _counter->num_rows_unselected += orig_rows - block->rows(); + } + return Status::OK(); +} + +void VParquetScanner::fill_columns_from_path(Block* block) { + const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); + if (range.__isset.num_of_columns_from_file) { + int start = range.num_of_columns_from_file; + int rows = block->rows(); + for (int i = 0; i < range.columns_from_path.size(); ++i) { + auto slot_desc = _src_slot_descs.at(i + start); + if (slot_desc == nullptr) continue; + auto is_nullable = slot_desc->is_nullable(); + DataTypePtr data_type = DataTypeFactory::instance().create_data_type(TYPE_VARCHAR, is_nullable); + MutableColumnPtr data_column = data_type->create_column(); + const std::string& column_from_path = range.columns_from_path[i]; + for (size_t i = 0; i < rows; ++i) { + data_column->insert_data(const_cast(column_from_path.c_str()), column_from_path.size()); + } + block->insert(ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } + } +} + +Status VParquetScanner::materialize_block(Block* block, std::vector& columns) { + int ctx_idx = 0; + size_t orig_rows = block->rows(); + auto filter_column = ColumnUInt8::create(orig_rows, 1); + for (auto slot_desc : _dest_tuple_desc->slots()) { + if (!slot_desc->is_materialized()) { + continue; + } + int dest_index = ctx_idx++; + + VExprContext* ctx = _dest_vexpr_ctxs[dest_index]; + int result_column_id = 0; + // PT1 => dest primitive type + RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); + ColumnPtr& ptr = block->safe_get_by_position(result_column_id).column; + if (!slot_desc->is_nullable()) { + if (auto* nullable_column = check_and_get_column(*ptr)) { + if (nullable_column->has_null()) { + // fill filter if src has null value and dest column is not nullable + IColumn::Filter& filter = assert_cast(*filter_column).get_data(); + const ColumnPtr& null_column_ptr = nullable_column->get_null_map_column_ptr(); + const auto& column_data = assert_cast(*null_column_ptr).get_data(); + for (size_t i = 0; i < null_column_ptr->size(); ++i) { + filter[i] &= !column_data[i]; + } + } + ptr = nullable_column->get_nested_column_ptr(); + } + } + columns[dest_index] = (*std::move(ptr)).mutate(); + } + const IColumn::Filter& filter = assert_cast(*filter_column).get_data(); + size_t after_filtered_rows = orig_rows; + for (size_t i = 0; i < columns.size(); ++i) { + columns[i] = (*std::move(columns[i]->filter(filter, 0))).mutate(); + after_filtered_rows = columns[i]->size(); + } + _counter->num_rows_filtered += orig_rows - after_filtered_rows; + return Status::OK(); +} + +// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==> +// primitive type(PT1) ==materialize_block==> dest primitive type +Status VParquetScanner::cast_src_block(Block* block) { + // cast primitive type(PT0) to primitive type(PT1) + for (size_t i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto& arg = block->get_by_name(slot_desc->col_name()); + // remove nullable here, let the get_function decide whether nullable + auto return_type = slot_desc->get_data_type_ptr(); + ColumnsWithTypeAndName arguments + { + arg, + { + DataTypeString().create_column_const(arg.column->size(), remove_nullable(return_type)->get_name()), + std::make_shared(), + "" + } + }; + auto func_cast = SimpleFunctionFactory::instance().get_function("CAST", arguments, return_type); + RETURN_IF_ERROR(func_cast->execute(nullptr, *block, {i}, i, arg.column->size())); + block->get_by_position(i).type = std::move(return_type); + } + return Status::OK(); +} + +Status VParquetScanner::append_batch_to_src_block(Block* block) { + size_t num_elements = + std::min((_state->batch_size() - block->rows()), (_batch->num_rows() - _arrow_batch_cur_idx)); + size_t column_pos = 0; + for (auto i = 0; i < _num_of_columns_from_file; ++i) { + SlotDescriptor* slot_desc = _src_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + auto* array = _batch->column(column_pos++).get(); + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + RETURN_IF_ERROR(arrow_column_to_doris_column(array, _arrow_batch_cur_idx, column_with_type_and_name, num_elements, _state->timezone())); + } + + _arrow_batch_cur_idx += num_elements; + return Status::OK(); +} + + + +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h new file mode 100644 index 00000000000000..89a2bba332a8af --- /dev/null +++ b/be/src/vec/exec/vparquet_scanner.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include +#include "common/status.h" +#include +#include "gen_cpp/PlanNodes_types.h" +#include "gen_cpp/Types_types.h" +#include "runtime/mem_pool.h" +#include "util/runtime_profile.h" + +namespace doris::vectorized { + +// VParquet scanner convert the data read from Parquet to doris's columns. +class VParquetScanner : public ParquetScanner { +public: + VParquetScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const std::vector& pre_filter_texprs, ScannerCounter* counter); + + virtual ~VParquetScanner(); + + // Open this scanner, will initialize information need to + Status open(); + + Status get_next(std::vector& columns, bool* eof); + +private: + Status next_arrow_batch(); + Status init_arrow_batch_if_necessary(); + Status init_src_block(Block* block); + Status append_batch_to_src_block(Block* block); + Status cast_src_block(Block* block); + Status eval_conjunts(Block* block); + Status materialize_block(Block* block, std::vector& columns); + void fill_columns_from_path(Block* block); + +private: + std::shared_ptr _batch; + size_t _arrow_batch_cur_idx; + int _num_of_columns_from_file; +}; + +} // namespace doris diff --git a/be/src/vec/functions/function_cast.h b/be/src/vec/functions/function_cast.h index 144b782554c589..63ce52873374f9 100644 --- a/be/src/vec/functions/function_cast.h +++ b/be/src/vec/functions/function_cast.h @@ -966,8 +966,9 @@ class FunctionCast final : public IFunctionBase { !(check_and_get_data_type(from_type.get()) || check_and_get_data_type(from_type.get()))) { function = FunctionConvertToTimeType::create(); - } else + } else { function = FunctionTo::Type::create(); + } /// Check conversion using underlying function { function->get_return_type(ColumnsWithTypeAndName(1, {nullptr, from_type, ""})); } diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp b/be/src/vec/utils/arrow_column_to_doris_column.cpp new file mode 100644 index 00000000000000..fb37e08b2d1477 --- /dev/null +++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp @@ -0,0 +1,279 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/utils/arrow_column_to_doris_column.h" + +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/runtime/vdatetime_value.h" + +#include +#include +#include + +#include "arrow/type.h" +#include "arrow/array/array_binary.h" +#include "arrow/array/array_nested.h" +#include "arrow/scalar.h" +#include "arrow/type_fwd.h" +#include "arrow/type_traits.h" + +#define FOR_ARROW_TYPES(M) \ + M(::arrow::Type::BOOL, TYPE_BOOLEAN) \ + M(::arrow::Type::INT8, TYPE_TINYINT) \ + M(::arrow::Type::UINT8, TYPE_TINYINT) \ + M(::arrow::Type::INT16, TYPE_SMALLINT) \ + M(::arrow::Type::UINT16, TYPE_SMALLINT) \ + M(::arrow::Type::INT32, TYPE_INT) \ + M(::arrow::Type::UINT32, TYPE_INT) \ + M(::arrow::Type::INT64, TYPE_BIGINT) \ + M(::arrow::Type::UINT64, TYPE_BIGINT) \ + M(::arrow::Type::HALF_FLOAT, TYPE_FLOAT) \ + M(::arrow::Type::FLOAT, TYPE_FLOAT) \ + M(::arrow::Type::DOUBLE, TYPE_DOUBLE) \ + M(::arrow::Type::BINARY, TYPE_VARCHAR) \ + M(::arrow::Type::FIXED_SIZE_BINARY, TYPE_VARCHAR) \ + M(::arrow::Type::STRING, TYPE_VARCHAR) \ + M(::arrow::Type::TIMESTAMP, TYPE_DATETIME) \ + M(::arrow::Type::DATE32, TYPE_DATE) \ + M(::arrow::Type::DATE64, TYPE_DATETIME) \ + M(::arrow::Type::DECIMAL, TYPE_DECIMALV2) + +#define FOR_ARROW_NUMERIC_TYPES(M) \ + M(arrow::Type::UINT8, UInt8) \ + M(arrow::Type::INT8, Int8) \ + M(arrow::Type::INT16, Int16) \ + M(arrow::Type::INT32, Int32) \ + M(arrow::Type::UINT64, UInt64) \ + M(arrow::Type::INT64, Int64) \ + M(arrow::Type::HALF_FLOAT, Float32) \ + M(arrow::Type::FLOAT, Float32) \ + M(arrow::Type::DOUBLE, Float64) + +namespace doris::vectorized { + +const PrimitiveType arrow_type_to_pt(::arrow::Type::type type) { + switch(type) { +# define DISPATCH(ARROW_TYPE, CPP_TYPE) \ + case ARROW_TYPE: \ + return CPP_TYPE; + FOR_ARROW_TYPES(DISPATCH) +# undef DISPATCH + default: + break; + } + return INVALID_TYPE; +} + +static size_t fill_nullable_column(const arrow::Array* array, size_t array_idx, vectorized::ColumnNullable* nullable_column, + size_t num_elements) { + size_t null_elements_count = 0; + NullMap& map_data = nullable_column->get_null_map_data(); + for (size_t i = 0; i < num_elements; ++i) { + auto is_null = array->IsNull(array_idx + i); + map_data.push_back(is_null); + null_elements_count += is_null; + } + return null_elements_count; +} + +/// Inserts chars and offsets right into internal column data to reduce an overhead. +/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars. +/// Also internal strings are null terminated. +static Status convert_column_with_string_data(const arrow::Array* array, size_t array_idx, + MutableColumnPtr& data_column, size_t num_elements) { + PaddedPODArray & column_chars_t = assert_cast(*data_column).get_chars(); + PaddedPODArray & column_offsets = assert_cast(*data_column).get_offsets(); + + const auto & concrete_array = dynamic_cast(*array); + std::shared_ptr buffer = concrete_array.value_data(); + + for (size_t offset_i = array_idx; offset_i < array_idx + num_elements; ++offset_i) { + if (!concrete_array.IsNull(offset_i) && buffer) { + const auto * raw_data = buffer->data() + concrete_array.value_offset(offset_i); + column_chars_t.insert(raw_data, raw_data + concrete_array.value_length(offset_i)); + } + column_chars_t.emplace_back('\0'); + + column_offsets.emplace_back(column_chars_t.size()); + } + return Status::OK(); +} + +static Status convert_column_with_fixed_size_data(const arrow::Array* array, size_t array_idx, + MutableColumnPtr& data_column, size_t num_elements) { + PaddedPODArray & column_chars_t = assert_cast(*data_column).get_chars(); + PaddedPODArray & column_offsets = assert_cast(*data_column).get_offsets(); + + const auto & concrete_array = dynamic_cast(*array); + uint32_t width = concrete_array.byte_width(); + const auto* array_data = concrete_array.GetValue(array_idx); + + for (size_t offset_i = 0; offset_i < num_elements; ++offset_i) { + if (!concrete_array.IsNull(offset_i)) { + const auto * raw_data = array_data + (offset_i * width); + column_chars_t.insert(raw_data, raw_data + width); + } + column_chars_t.emplace_back('\0'); + column_offsets.emplace_back(column_chars_t.size()); + } + return Status::OK(); +} + +/// Inserts numeric data right into internal column data to reduce an overhead +template > +static Status convert_column_with_numeric_data(const arrow::Array* array, size_t array_idx, + MutableColumnPtr& data_column, size_t num_elements){ + auto & column_data = static_cast(*data_column).get_data(); + /// buffers[0] is a null bitmap and buffers[1] are actual values + std::shared_ptr buffer = array->data()->buffers[1]; + const auto * raw_data = reinterpret_cast(buffer->data()) + array_idx; + column_data.insert(raw_data, raw_data + num_elements); + return Status::OK(); +} + +static Status convert_column_with_boolean_data(const arrow::Array* array, size_t array_idx, + MutableColumnPtr& data_column, size_t num_elements){ + auto & column_data = static_cast &>(*data_column).get_data(); + const auto & concrete_array = dynamic_cast(*array); + for (size_t bool_i = array_idx; bool_i < array_idx + num_elements; ++bool_i) { + column_data.emplace_back(concrete_array.Value(bool_i)); + } + return Status::OK(); +} + +static int64_t time_unit_divisor(arrow::TimeUnit::type unit) { + // Doris only supports seconds + switch (unit) { + case arrow::TimeUnit::type::SECOND: { + return 1L; + } + case arrow::TimeUnit::type::MILLI: { + return 1000L; + } + case arrow::TimeUnit::type::MICRO: { + return 1000000L; + } + case arrow::TimeUnit::type::NANO: { + return 1000000000L; + } + default: + return 0L; + } +} + +template +static Status convert_column_with_timestamp_data(const arrow::Array* array, size_t array_idx, + MutableColumnPtr& data_column, size_t num_elements, + const std::string& timezone){ + auto & column_data = static_cast &>(*data_column).get_data(); + const auto & concrete_array = dynamic_cast(*array); + int64_t divisor = 1; + int64_t multiplier = 1; + if constexpr (std::is_same_v) { + const auto type = std::static_pointer_cast(array->type()); + divisor = time_unit_divisor(type->unit()); + if (divisor == 0L) { + return Status::InternalError(fmt::format("Invalid Time Type:{}", type->name())); + } + } else if constexpr (std::is_same_v) { + multiplier = 24 * 60 * 60; // day => secs + } else if constexpr (std::is_same_v) { + divisor = 1000; //ms => secs + } + + for (size_t value_i = array_idx; value_i < array_idx + num_elements; ++value_i) { + VecDateTimeValue v; + v.from_unixtime(static_cast(concrete_array.Value(value_i)) / divisor * multiplier, timezone) ; + if constexpr (std::is_same_v) { + v.cast_to_date(); + } + column_data.emplace_back(binary_cast(v)); + } + return Status::OK(); +} + +static Status convert_column_with_decimal_data(const arrow::Array* array, size_t array_idx, + MutableColumnPtr& data_column, size_t num_elements){ + auto & column_data = static_cast &>(*data_column).get_data(); + const auto & concrete_array = dynamic_cast(*array); + const auto * arrow_decimal_type = static_cast(array->type().get()); + // TODO check precision + //size_t precision = arrow_decimal_type->precision(); + const auto scale = arrow_decimal_type->scale(); + + for (size_t value_i = array_idx; value_i < array_idx + num_elements; ++value_i) { + auto value = *reinterpret_cast(concrete_array.Value(value_i)); + // convert scale to 9 + if (scale != 9) { + value = convert_decimals, + vectorized::DataTypeDecimal>( + value, scale, 9); + } + column_data.emplace_back(value); + } + return Status::OK(); +} + +Status arrow_column_to_doris_column(const arrow::Array* arrow_column, + size_t arrow_batch_cur_idx, + ColumnWithTypeAndName& doirs_column, + size_t num_elements, + const std::string& timezone) { + // src column always be nullable for simpify converting + assert(doirs_column.column->is_nullable()); + MutableColumnPtr data_column = nullptr; + if (doirs_column.column->is_nullable()) { + auto* nullable_column = reinterpret_cast((*std::move(doirs_column.column)).mutate().get()); + fill_nullable_column(arrow_column, arrow_batch_cur_idx, nullable_column, num_elements); + data_column = nullable_column->get_nested_column_ptr(); + } else { + data_column = (*std::move(doirs_column.column)).mutate(); + } + // process data + switch (arrow_column->type()->id()) + { + case arrow::Type::STRING: + case arrow::Type::BINARY: + return convert_column_with_string_data(arrow_column, arrow_batch_cur_idx, data_column, num_elements); + case arrow::Type::FIXED_SIZE_BINARY: + return convert_column_with_fixed_size_data(arrow_column, arrow_batch_cur_idx, data_column, num_elements); +# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ + case ARROW_NUMERIC_TYPE: \ + return convert_column_with_numeric_data(arrow_column, arrow_batch_cur_idx, data_column, num_elements); + FOR_ARROW_NUMERIC_TYPES(DISPATCH) +# undef DISPATCH + case arrow::Type::BOOL: + return convert_column_with_boolean_data(arrow_column, arrow_batch_cur_idx, data_column, num_elements); + case arrow::Type::DATE32: + return convert_column_with_timestamp_data( + arrow_column, arrow_batch_cur_idx, data_column, num_elements, timezone); + case arrow::Type::DATE64: + return convert_column_with_timestamp_data( + arrow_column, arrow_batch_cur_idx, data_column, num_elements, timezone); + case arrow::Type::TIMESTAMP: + return convert_column_with_timestamp_data( + arrow_column, arrow_batch_cur_idx, data_column, num_elements, timezone); + case arrow::Type::DECIMAL: + return convert_column_with_decimal_data(arrow_column, arrow_batch_cur_idx, data_column, num_elements); + default: + break; + } + return Status::NotSupported(fmt::format( + "Not support arrow type:{}", arrow_column->type()->name())); +} +} // namespace doris diff --git a/be/src/vec/utils/arrow_column_to_doris_column.h b/be/src/vec/utils/arrow_column_to_doris_column.h new file mode 100644 index 00000000000000..72f2d3910edba7 --- /dev/null +++ b/be/src/vec/utils/arrow_column_to_doris_column.h @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/status.h" +#include "runtime/primitive_type.h" +#include "vec/core/column_with_type_and_name.h" + +// This files contains some utilities to convert Doris internal +// data format from Apache Arrow format. +namespace doris::vectorized { + +const PrimitiveType arrow_type_to_pt(::arrow::Type::type type); + +Status arrow_column_to_doris_column(const arrow::Array* arrow_column, + size_t arrow_batch_cur_idx, + ColumnWithTypeAndName& doirs_column, + size_t num_elements, + const std::string& timezone); +} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 248b7f127d7437..27bcb4f7f937ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -255,6 +255,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws case FILE_STREAM: path = request.getPath(); break; + case FILE_LOCAL: + path = request.getPath(); + break; default: throw new UserException("unsupported file type, type=" + request.getFileType()); } From 4f815d2f8c690757e69904bb581b339f288d328d Mon Sep 17 00:00:00 2001 From: Tao Yin <373141588@qq.com> Date: Tue, 26 Apr 2022 21:10:53 +0800 Subject: [PATCH 2/3] [feature-wip](parquet-vec) Support parquet scanner in vectorized engine --- be/src/http/action/stream_load.cpp | 36 +++++++++---------- be/src/vec/exec/vparquet_scanner.cpp | 13 +++---- .../utils/arrow_column_to_doris_column.cpp | 4 +-- .../vec/utils/arrow_column_to_doris_column.h | 2 +- 4 files changed, 26 insertions(+), 29 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index 2706968b25b5c3..c11c7b01933c96 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -77,28 +77,28 @@ static TFileFormatType::type parse_format(const std::string& format_str, return parse_format("CSV", compress_type); } TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN; - if (boost::iequals(format_str, "CSV")) { + if (iequal(format_str, "CSV")) { if (compress_type.empty()) { format_type = TFileFormatType::FORMAT_CSV_PLAIN; } - if (boost::iequals(compress_type, "GZ")) { + if (iequal(compress_type, "GZ")) { format_type = TFileFormatType::FORMAT_CSV_GZ; - } else if (boost::iequals(compress_type, "LZO")) { + } else if (iequal(compress_type, "LZO")) { format_type = TFileFormatType::FORMAT_CSV_LZO; - } else if (boost::iequals(compress_type, "BZ2")) { + } else if (iequal(compress_type, "BZ2")) { format_type = TFileFormatType::FORMAT_CSV_BZ2; - } else if (boost::iequals(compress_type, "LZ4FRAME")) { + } else if (iequal(compress_type, "LZ4FRAME")) { format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; - } else if (boost::iequals(compress_type, "LZOP")) { + } else if (iequal(compress_type, "LZOP")) { format_type = TFileFormatType::FORMAT_CSV_LZOP; - } else if (boost::iequals(compress_type, "DEFLATE")) { + } else if (iequal(compress_type, "DEFLATE")) { format_type = TFileFormatType::FORMAT_CSV_DEFLATE; } - } else if (boost::iequals(format_str, "JSON")) { + } else if (iequal(format_str, "JSON")) { if (compress_type.empty()) { format_type = TFileFormatType::FORMAT_JSON; } - } else if (boost::iequals(format_str, "PARQUET")) { + } else if (iequal(format_str, "PARQUET")) { format_type = TFileFormatType::FORMAT_PARQUET; } return format_type; @@ -264,7 +264,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct } // get format of this put - if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && boost::iequals(http_req->header(HTTP_FORMAT_KEY), "JSON")) { + if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) { return Status::InternalError("compress data of JSON format is not supported."); } ctx->format = @@ -285,7 +285,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024; bool read_json_by_line = false; if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) { - if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { + if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { read_json_by_line = true; } } @@ -433,9 +433,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_negative(false); } if (!http_req->header(HTTP_STRICT_MODE).empty()) { - if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) { + if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) { request.__set_strictMode(false); - } else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) { + } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) { request.__set_strictMode(true); } else { return Status::InvalidArgument("Invalid strict mode format. Must be bool type"); @@ -458,7 +458,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_json_root(http_req->header(HTTP_JSONROOT)); } if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) { - if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) { + if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) { request.__set_strip_outer_array(true); } else { request.__set_strip_outer_array(false); @@ -467,7 +467,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_strip_outer_array(false); } if (!http_req->header(HTTP_NUM_AS_STRING).empty()) { - if (boost::iequals(http_req->header(HTTP_NUM_AS_STRING), "true")) { + if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) { request.__set_num_as_string(true); } else { request.__set_num_as_string(false); @@ -476,7 +476,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* request.__set_num_as_string(false); } if (!http_req->header(HTTP_FUZZY_PARSE).empty()) { - if (boost::iequals(http_req->header(HTTP_FUZZY_PARSE), "true")) { + if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) { request.__set_fuzzy_parse(true); } else { request.__set_fuzzy_parse(false); @@ -486,7 +486,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* } if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) { - if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { + if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { request.__set_read_json_by_line(true); } else { request.__set_read_json_by_line(false); @@ -509,7 +509,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext* } if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) { - if (boost::iequals(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) { + if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) { request.__set_load_to_single_tablet(true); } else { request.__set_load_to_single_tablet(false); diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index 6fc7cb2a1f5eec..5ca248c1a3cbb0 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -87,9 +87,10 @@ Status VParquetScanner::next_arrow_batch() { Status VParquetScanner::init_arrow_batch_if_necessary() { // 1. init batch if first time // 2. reset reader if end of file + Status status; if (_scanner_eof || _batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { while (!_scanner_eof) { - Status status = next_arrow_batch(); + status = next_arrow_batch(); if (_scanner_eof) { return status; } @@ -100,7 +101,7 @@ Status VParquetScanner::init_arrow_batch_if_necessary() { return status; } } - return Status::OK(); + return status; } Status VParquetScanner::init_src_block(Block* block) { @@ -111,7 +112,7 @@ Status VParquetScanner::init_src_block(Block* block) { continue; } auto* array = _batch->column(batch_pos++).get(); - auto pt = arrow_type_to_pt(array->type()->id()); + auto pt = arrow_type_to_primitive_type(array->type()->id()); if (pt == INVALID_TYPE) { return Status::NotSupported(fmt::format( "Not support arrow type:{}", array->type()->name())); @@ -179,11 +180,7 @@ Status VParquetScanner::get_next(std::vector& columns, bool* e RETURN_IF_ERROR(eval_conjunts(&src_block)); // materialize, src block => dest columns RETURN_IF_ERROR(materialize_block(&src_block, columns)); - if (_scanner_eof) { - *eof = true; - } else { - *eof = false; - } + *eof = _scanner_eof; return Status::OK(); } diff --git a/be/src/vec/utils/arrow_column_to_doris_column.cpp b/be/src/vec/utils/arrow_column_to_doris_column.cpp index fb37e08b2d1477..c535818fd3e896 100644 --- a/be/src/vec/utils/arrow_column_to_doris_column.cpp +++ b/be/src/vec/utils/arrow_column_to_doris_column.cpp @@ -66,7 +66,7 @@ namespace doris::vectorized { -const PrimitiveType arrow_type_to_pt(::arrow::Type::type type) { +const PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type) { switch(type) { # define DISPATCH(ARROW_TYPE, CPP_TYPE) \ case ARROW_TYPE: \ @@ -85,7 +85,7 @@ static size_t fill_nullable_column(const arrow::Array* array, size_t array_idx, NullMap& map_data = nullable_column->get_null_map_data(); for (size_t i = 0; i < num_elements; ++i) { auto is_null = array->IsNull(array_idx + i); - map_data.push_back(is_null); + map_data.emplace_back(is_null); null_elements_count += is_null; } return null_elements_count; diff --git a/be/src/vec/utils/arrow_column_to_doris_column.h b/be/src/vec/utils/arrow_column_to_doris_column.h index 72f2d3910edba7..a3dcf6508e73cd 100644 --- a/be/src/vec/utils/arrow_column_to_doris_column.h +++ b/be/src/vec/utils/arrow_column_to_doris_column.h @@ -30,7 +30,7 @@ // data format from Apache Arrow format. namespace doris::vectorized { -const PrimitiveType arrow_type_to_pt(::arrow::Type::type type); +const PrimitiveType arrow_type_to_primitive_type(::arrow::Type::type type); Status arrow_column_to_doris_column(const arrow::Array* arrow_column, size_t arrow_batch_cur_idx, From a04185b7f71443f1a354966b9ff6e0a923246126 Mon Sep 17 00:00:00 2001 From: Tao Yin <373141588@qq.com> Date: Fri, 29 Apr 2022 11:24:15 +0800 Subject: [PATCH 3/3] support implicit cast for parquet vec load --- be/src/olap/rowset/segment_v2/rle_page.h | 1 + be/src/vec/exec/vparquet_scanner.cpp | 2 +- .../org/apache/doris/analysis/Analyzer.java | 24 ++-- .../java/org/apache/doris/analysis/Expr.java | 17 +++ .../doris/analysis/TupleDescriptor.java | 9 ++ .../java/org/apache/doris/catalog/Type.java | 4 + .../main/java/org/apache/doris/load/Load.java | 114 ++++++++++++++++-- .../apache/doris/planner/BrokerScanNode.java | 4 +- .../doris/planner/StreamLoadScanNode.java | 4 +- 9 files changed, 155 insertions(+), 24 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/rle_page.h b/be/src/olap/rowset/segment_v2/rle_page.h index 52a9613cf45d1e..89d2b16cb7e6ae 100644 --- a/be/src/olap/rowset/segment_v2/rle_page.h +++ b/be/src/olap/rowset/segment_v2/rle_page.h @@ -104,6 +104,7 @@ class RlePageBuilder : public PageBuilder { void reset() override { _count = 0; + _finished = false; _rle_encoder->Clear(); _rle_encoder->Reserve(RLE_PAGE_HEADER_SIZE, 0); } diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index 5ca248c1a3cbb0..8317f94b0305a0 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -272,7 +272,7 @@ Status VParquetScanner::cast_src_block(Block* block) { { arg, { - DataTypeString().create_column_const(arg.column->size(), remove_nullable(return_type)->get_name()), + DataTypeString().create_column_const(arg.column->size(), remove_nullable(return_type)->get_family_name()), std::make_shared(), "" } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index e46831100d8f3c..5f75aeafb721df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -330,15 +330,23 @@ public GlobalState(Catalog catalog, ConnectContext context) { mvRewriteRules.add(CountFieldToSum.INSTANCE); mvExprRewriter = new ExprRewriter(mvRewriteRules); - // compute max exec mem could be used for broadcast join - long perNodeMemLimit = context.getSessionVariable().getMaxExecMemByte(); - double autoBroadcastJoinThresholdPercentage = context.getSessionVariable().autoBroadcastJoinThreshold; - if (autoBroadcastJoinThresholdPercentage > 1) { - autoBroadcastJoinThresholdPercentage = 1.0; - } else if (autoBroadcastJoinThresholdPercentage <= 0) { - autoBroadcastJoinThresholdPercentage = -1.0; + // context maybe null. eg, for StreamLoadPlanner. + // and autoBroadcastJoinThreshold is only used for Query's DistributedPlanner. + // so it is ok to not set autoBroadcastJoinThreshold if context is null + if (context != null) { + // compute max exec mem could be used for broadcast join + long perNodeMemLimit = context.getSessionVariable().getMaxExecMemByte(); + double autoBroadcastJoinThresholdPercentage = context.getSessionVariable().autoBroadcastJoinThreshold; + if (autoBroadcastJoinThresholdPercentage > 1) { + autoBroadcastJoinThresholdPercentage = 1.0; + } else if (autoBroadcastJoinThresholdPercentage <= 0) { + autoBroadcastJoinThresholdPercentage = -1.0; + } + autoBroadcastJoinThreshold = (long) (perNodeMemLimit * autoBroadcastJoinThresholdPercentage); + } else { + // autoBroadcastJoinThreshold is a "final" field, must set an initial value for it + autoBroadcastJoinThreshold = 0; } - autoBroadcastJoinThreshold = (long)(perNodeMemLimit * autoBroadcastJoinThresholdPercentage); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 750c42f95a34ec..3778606336c236 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -184,6 +184,23 @@ public boolean apply(Expr arg) { public boolean apply(Expr arg) { return arg instanceof NullLiteral; } }; + public static final com.google.common.base.Predicate IS_VARCHAR_SLOT_REF_IMPLICIT_CAST = + new com.google.common.base.Predicate() { + @Override + public boolean apply(Expr arg) { + // exclude explicit cast. for example: cast(k1 as date) + if (!arg.isImplicitCast()) { + return false; + } + List children = arg.getChildren(); + if (children.isEmpty()) { + return false; + } + Expr child = children.get(0); + return child instanceof SlotRef && child.getType().isVarchar(); + } + }; + public void setSelectivity() { selectivity = -1; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java index 9a7b7b7bd0ec47..a5333db1b4e8a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleDescriptor.java @@ -103,6 +103,15 @@ public ArrayList getSlots() { return slots; } + public SlotDescriptor getSlot(int slotId) { + for (SlotDescriptor slotDesc : slots) { + if (slotDesc.getId().asInt() == slotId) { + return slotDesc; + } + } + return null; + } + public void setCardinality(long cardinality) { this.cardinality = cardinality; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java index 53a2ab980e3b39..9eebfc7e12c00e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java @@ -192,6 +192,10 @@ public boolean isStringType() { || isScalarType(PrimitiveType.STRING); } + public boolean isVarchar() { + return isScalarType(PrimitiveType.VARCHAR); + } + // only metric types have the following constraint: // 1. don't support as key column // 2. don't support filter diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 905f9507378a81..ed271ea87c17ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -95,6 +95,7 @@ import org.apache.doris.task.PushTask; import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TEtlState; +import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPriority; @@ -931,7 +932,7 @@ public static List getSchemaChangeShadowColumnDesc(Table tbl, */ public static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction) throws UserException { - initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, false); + initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, false, false); } /* @@ -941,10 +942,11 @@ public static void initColumns(Table tbl, List columnExprs, public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, - Map slotDescByName, TBrokerScanRangeParams params) throws UserException { + Map slotDescByName, TBrokerScanRangeParams params, + TFileFormatType formatType, boolean useVectorizedLoad) throws UserException { rewriteColumns(columnDescs); initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, - srcTupleDesc, slotDescByName, params, true); + srcTupleDesc, slotDescByName, params, formatType, useVectorizedLoad, true); } /* @@ -959,6 +961,7 @@ private static void initColumns(Table tbl, List columnExprs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, TBrokerScanRangeParams params, + TFileFormatType formatType, boolean useVectorizedLoad, boolean needInitSlotAndAnalyzeExprs) throws UserException { // We make a copy of the columnExprs so that our subsequent changes // to the columnExprs will not affect the original columnExprs. @@ -1044,26 +1047,52 @@ private static void initColumns(Table tbl, List columnExprs, if (!needInitSlotAndAnalyzeExprs) { return; } - + Set exprArgsColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); + for (ImportColumnDesc importColumnDesc : copiedColumnExprs) { + if (importColumnDesc.isColumn()) { + continue; + } + List slots = Lists.newArrayList(); + importColumnDesc.getExpr().collect(SlotRef.class, slots); + for (SlotRef slot : slots) { + String slotColumnName = slot.getColumnName(); + exprArgsColumns.add(slotColumnName); + } + } + Set excludedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); // init slot desc add expr map, also transform hadoop functions for (ImportColumnDesc importColumnDesc : copiedColumnExprs) { // make column name case match with real column name String columnName = importColumnDesc.getColumnName(); - String realColName = tbl.getColumn(columnName) == null ? columnName + Column tblColumn = tbl.getColumn(columnName); + String realColName = tblColumn == null ? columnName : tbl.getColumn(columnName).getName(); if (importColumnDesc.getExpr() != null) { Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr()); exprsByName.put(realColName, expr); } else { SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc); - slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + // only support parquet format now + if (exprArgsColumns.contains(columnName) || formatType != TFileFormatType.FORMAT_PARQUET + || !useVectorizedLoad) { + // columns in expr args should be parsed as varchar type + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); + excludedColumns.add(realColName); + // ISSUE A: src slot should be nullable even if the column is not nullable. + // because src slot is what we read from file, not represent to real column value. + // If column is not nullable, error will be thrown when filling the dest slot, + // which is not nullable. + slotDesc.setIsNullable(true); + } else { + // in vectorized load, + // columns from files like parquet files can be parsed as the type in table schema + slotDesc.setType(tblColumn.getType()); + slotDesc.setColumn(new Column(realColName, tblColumn.getType())); + // non-nullable column is allowed in vectorized load with parquet format + slotDesc.setIsNullable(tblColumn.isAllowNull()); + } slotDesc.setIsMaterialized(true); - // ISSUE A: src slot should be nullable even if the column is not nullable. - // because src slot is what we read from file, not represent to real column value. - // If column is not nullable, error will be thrown when filling the dest slot, - // which is not nullable. - slotDesc.setIsNullable(true); - slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR)); params.addToSrcSlotIds(slotDesc.getId().asInt()); slotDescByName.put(realColName, slotDesc); } @@ -1082,7 +1111,29 @@ private static void initColumns(Table tbl, List columnExprs, } LOG.debug("slotDescByName: {}, exprsByName: {}, mvDefineExpr: {}", slotDescByName, exprsByName, mvDefineExpr); + // we only support parquet format now + // use implicit deduction to convert columns that are not in the doris table from varchar to a more appropriate type + if (useVectorizedLoad && formatType == TFileFormatType.FORMAT_PARQUET) { + // analyze all exprs + Map cloneExprsByName = Maps.newHashMap(exprsByName); + Map cloneMvDefineExpr = Maps.newHashMap(mvDefineExpr); + analyzeAllExprs(tbl, analyzer, cloneExprsByName, cloneMvDefineExpr, slotDescByName, useVectorizedLoad); + // columns that only exist in mapping expr args, replace type with inferred from exprs, + // if there are more than one, choose the last except varchar type + // for example: + // k1 involves two mapping expr args: year(k1), t1=k1, k1's varchar type will be replaced by DATETIME + replaceVarcharWithCastType(cloneExprsByName, srcTupleDesc, excludedColumns); + } + + // in vectorized load, reanalyze exprs with castExpr type + // otherwise analyze exprs with varchar type + analyzeAllExprs(tbl, analyzer, exprsByName, mvDefineExpr, slotDescByName, useVectorizedLoad); + LOG.debug("after init column, exprMap: {}", exprsByName); + } + private static void analyzeAllExprs(Table tbl, Analyzer analyzer, Map exprsByName, + Map mvDefineExpr, Map slotDescByName, + boolean useVectorizedLoad) throws UserException { // analyze all exprs for (Map.Entry entry : exprsByName.entrySet()) { ExprSubstitutionMap smap = new ExprSubstitutionMap(); @@ -1146,7 +1197,44 @@ private static void initColumns(Table tbl, List columnExprs, exprsByName.put(entry.getKey(), expr); } - LOG.debug("after init column, exprMap: {}", exprsByName); + } + /** + * @param excludedColumns: columns that the type should not be inferred from expr. + * 1. column exists in both schema and expr args. + */ + private static void replaceVarcharWithCastType(Map exprsByName, TupleDescriptor srcTupleDesc, + Set excludedColumns) throws UserException { + for (Map.Entry entry : exprsByName.entrySet()) { + List casts = Lists.newArrayList(); + // exclude explicit cast. for example: cast(k1 as date) + entry.getValue().collect(Expr.IS_VARCHAR_SLOT_REF_IMPLICIT_CAST, casts); + if (casts.isEmpty()) { + continue; + } + + for (CastExpr cast : casts) { + Expr child = cast.getChild(0); + Type type = cast.getType(); + if (type.isVarchar()) { + continue; + } + + SlotRef slotRef = (SlotRef) child; + String columnName = slotRef.getColumn().getName(); + if (excludedColumns.contains(columnName)) { + continue; + } + + // replace src slot desc with cast return type + int slotId = slotRef.getSlotId().asInt(); + SlotDescriptor srcSlotDesc = srcTupleDesc.getSlot(slotId); + if (srcSlotDesc == null) { + throw new UserException("Unknown source slot descriptor. id: " + slotId); + } + srcSlotDesc.setType(type); + srcSlotDesc.setColumn(new Column(columnName, type)); + } + } } public static void rewriteColumns(LoadTaskInfo.ImportColumnDescs columnDescs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 31a679aeda7fc4..0a2c63d0482acb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -36,6 +36,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.Load; import org.apache.doris.load.loadv2.LoadTask; @@ -261,7 +262,8 @@ private void initColumns(ParamCreateContext context) throws UserException { Load.initColumns(targetTable, columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, - context.srcTupleDescriptor, context.slotDescByName, context.params); + context.srcTupleDescriptor, context.slotDescByName, context.params, + formatType(context.fileGroup.getFileFormat(), ""), VectorizedUtil.isVectorized()); } private TScanRangeLocations newLocations(TBrokerScanRangeParams params, BrokerDesc brokerDesc) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index 200e232e2e9021..36d438a7e29e99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Table; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.Load; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.task.LoadTaskInfo; @@ -137,7 +138,8 @@ public void init(Analyzer analyzer) throws UserException { } Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */, - exprsByName, analyzer, srcTupleDesc, slotDescByName, params); + exprsByName, analyzer, srcTupleDesc, slotDescByName, params, + taskInfo.getFormatType(), VectorizedUtil.isVectorized()); // analyze where statement initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(), this.srcTupleDesc, analyzer);