diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index 4504faf0e9c0dd..189e0211e52162 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -31,19 +31,19 @@ namespace doris { namespace vectorized { -VHivePartitionWriter::VHivePartitionWriter( - const TDataSink& t_sink, std::string partition_name, TUpdateMode::type update_mode, - const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs& write_output_expr_ctxs, - const std::set& non_write_columns_indices, const std::vector& columns, - WriteInfo write_info, std::string file_name, int file_name_index, - TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, - const std::map& hadoop_conf) +VHivePartitionWriter::VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, + TUpdateMode::type update_mode, + const VExprContextSPtrs& write_output_expr_ctxs, + std::vector write_column_names, + WriteInfo write_info, std::string file_name, + int file_name_index, + TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map& hadoop_conf) : _partition_name(std::move(partition_name)), _update_mode(update_mode), - _vec_output_expr_ctxs(output_expr_ctxs), _write_output_expr_ctxs(write_output_expr_ctxs), - _non_write_columns_indices(non_write_columns_indices), - _columns(columns), + _write_column_names(std::move(write_column_names)), _write_info(std::move(write_info)), _file_name(std::move(file_name)), _file_name_index(file_name_index), @@ -63,14 +63,6 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()), 0, _file_writer, &file_writer_options)); - std::vector column_names; - column_names.reserve(_columns.size()); - for (int i = 0; i < _columns.size(); i++) { - if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) { - column_names.emplace_back(_columns[i].name); - } - } - switch (_file_format_type) { case TFileFormatType::FORMAT_PARQUET: { bool parquet_disable_dictionary = false; @@ -94,7 +86,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) } } _file_format_transformer.reset(new VParquetTransformer( - state, _file_writer.get(), _write_output_expr_ctxs, std::move(column_names), + state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names, parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, false)); return _file_format_transformer->open(); @@ -125,7 +117,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) _file_format_transformer.reset( new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, - std::move(column_names), false, orc_compression_type)); + _write_column_names, false, orc_compression_type)); return _file_format_transformer->open(); } default: { @@ -156,43 +148,12 @@ Status VHivePartitionWriter::close(const Status& status) { return Status::OK(); } -Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) { - Block output_block; - RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block)); - RETURN_IF_ERROR(_file_format_transformer->write(output_block)); - _row_count += output_block.rows(); +Status VHivePartitionWriter::write(vectorized::Block& block) { + RETURN_IF_ERROR(_file_format_transformer->write(block)); + _row_count += block.rows(); return Status::OK(); } -Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, - const vectorized::IColumn::Filter* filter, - doris::vectorized::Block* output_block) { - Status status = Status::OK(); - if (input_block.rows() == 0) { - return status; - } - RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( - _vec_output_expr_ctxs, input_block, output_block, true)); - materialize_block_inplace(*output_block); - - if (filter == nullptr) { - return status; - } - - std::vector columns_to_filter; - int column_to_keep = input_block.columns(); - columns_to_filter.resize(column_to_keep); - for (uint32_t i = 0; i < column_to_keep; ++i) { - columns_to_filter[i] = i; - } - - Block::filter_block_internal(output_block, columns_to_filter, *filter); - - output_block->erase(_non_write_columns_indices); - - return status; -} - THivePartitionUpdate VHivePartitionWriter::_build_partition_update() { THivePartitionUpdate hive_partition_update; hive_partition_update.__set_name(_partition_name); diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h b/be/src/vec/sink/writer/vhive_partition_writer.h index 912ac8b1e496b2..a3a8cb9c7a35c9 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.h +++ b/be/src/vec/sink/writer/vhive_partition_writer.h @@ -44,11 +44,10 @@ class VHivePartitionWriter { TFileType::type file_type; }; - VHivePartitionWriter(const TDataSink& t_sink, const std::string partition_name, - TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, + VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, + TUpdateMode::type update_mode, const VExprContextSPtrs& write_output_expr_ctxs, - const std::set& non_write_columns_indices, - const std::vector& columns, WriteInfo write_info, + std::vector write_column_names, WriteInfo write_info, std::string file_name, int file_name_index, TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, @@ -58,7 +57,7 @@ class VHivePartitionWriter { Status open(RuntimeState* state, RuntimeProfile* profile); - Status write(vectorized::Block& block, IColumn::Filter* filter = nullptr); + Status write(vectorized::Block& block); Status close(const Status& status); @@ -72,10 +71,6 @@ class VHivePartitionWriter { std::string _get_target_file_name(); private: - Status _projection_and_filter_block(doris::vectorized::Block& input_block, - const vectorized::IColumn::Filter* filter, - doris::vectorized::Block* output_block); - THivePartitionUpdate _build_partition_update(); std::string _get_file_extension(TFileFormatType::type file_format_type, @@ -89,11 +84,10 @@ class VHivePartitionWriter { size_t _row_count = 0; - const VExprContextSPtrs& _vec_output_expr_ctxs; const VExprContextSPtrs& _write_output_expr_ctxs; - const std::set& _non_write_columns_indices; - const std::vector& _columns; + std::vector _write_column_names; + WriteInfo _write_info; std::string _file_name; int _file_name_index; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index 76f16e3daaa4d4..0e64060eb0baae 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -20,6 +20,7 @@ #include "runtime/runtime_state.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/core/materialize_block.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/sink/writer/vhive_partition_writer.h" @@ -82,8 +83,17 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { Status VHiveTableWriter::write(vectorized::Block& block) { SCOPED_RAW_TIMER(&_send_data_ns); + + if (block.rows() == 0) { + return Status::OK(); + } + Block output_block; + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + _vec_output_expr_ctxs, block, &output_block, false)); + materialize_block_inplace(output_block); + std::unordered_map, IColumn::Filter> writer_positions; - _row_count += block.rows(); + _row_count += output_block.rows(); auto& hive_table_sink = _t_sink.hive_table_sink; if (_partition_columns_input_index.empty()) { @@ -93,7 +103,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { auto writer_iter = _partitions_to_writers.find(""); if (writer_iter == _partitions_to_writers.end()) { try { - writer = _create_partition_writer(block, -1); + writer = _create_partition_writer(output_block, -1); } catch (doris::Exception& e) { return e.to_status(); } @@ -109,7 +119,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } _partitions_to_writers.erase(writer_iter); try { - writer = _create_partition_writer(block, -1, &file_name, + writer = _create_partition_writer(output_block, -1, &file_name, file_name_index + 1); } catch (doris::Exception& e) { return e.to_status(); @@ -122,16 +132,17 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } } SCOPED_RAW_TIMER(&_partition_writers_write_ns); - RETURN_IF_ERROR(writer->write(block)); + output_block.erase(_non_write_columns_indices); + RETURN_IF_ERROR(writer->write(output_block)); return Status::OK(); } { SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); - for (int i = 0; i < block.rows(); ++i) { + for (int i = 0; i < output_block.rows(); ++i) { std::vector partition_values; try { - partition_values = _create_partition_values(block, i); + partition_values = _create_partition_values(output_block, i); } catch (doris::Exception& e) { return e.to_status(); } @@ -143,10 +154,10 @@ Status VHiveTableWriter::write(vectorized::Block& block) { const std::string* file_name, int file_name_index, std::shared_ptr& writer_ptr) -> Status { try { - auto writer = - _create_partition_writer(block, position, file_name, file_name_index); + auto writer = _create_partition_writer(output_block, position, file_name, + file_name_index); RETURN_IF_ERROR(writer->open(_state, _profile)); - IColumn::Filter filter(block.rows(), 0); + IColumn::Filter filter(output_block.rows(), 0); filter[position] = 1; writer_positions.insert({writer, std::move(filter)}); _partitions_to_writers.insert({partition_name, writer}); @@ -185,7 +196,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } auto writer_pos_iter = writer_positions.find(writer); if (writer_pos_iter == writer_positions.end()) { - IColumn::Filter filter(block.rows(), 0); + IColumn::Filter filter(output_block.rows(), 0); filter[i] = 1; writer_positions.insert({writer, std::move(filter)}); } else { @@ -195,12 +206,39 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } } SCOPED_RAW_TIMER(&_partition_writers_write_ns); + output_block.erase(_non_write_columns_indices); for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { - RETURN_IF_ERROR(it->first->write(block, &it->second)); + Block filtered_block; + RETURN_IF_ERROR(_filter_block(output_block, &it->second, &filtered_block)); + RETURN_IF_ERROR(it->first->write(filtered_block)); } return Status::OK(); } +Status VHiveTableWriter::_filter_block(doris::vectorized::Block& block, + const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block) { + const ColumnsWithTypeAndName& columns_with_type_and_name = + block.get_columns_with_type_and_name(); + vectorized::ColumnsWithTypeAndName result_columns; + for (int i = 0; i < columns_with_type_and_name.size(); ++i) { + const auto& col = columns_with_type_and_name[i]; + result_columns.emplace_back(col.column->clone_resized(col.column->size()), col.type, + col.name); + } + *output_block = {std::move(result_columns)}; + + std::vector columns_to_filter; + int column_to_keep = output_block->columns(); + columns_to_filter.resize(column_to_keep); + for (uint32_t i = 0; i < column_to_keep; ++i) { + columns_to_filter[i] = i; + } + + Block::filter_block_internal(output_block, columns_to_filter, *filter); + return Status::OK(); +} + Status VHiveTableWriter::close(Status status) { int64_t partitions_to_writers_size = _partitions_to_writers.size(); { @@ -312,11 +350,18 @@ std::shared_ptr VHiveTableWriter::_create_partition_writer } _write_file_count++; + std::vector column_names; + column_names.reserve(hive_table_sink.columns.size()); + for (int i = 0; i < hive_table_sink.columns.size(); i++) { + if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) { + column_names.emplace_back(hive_table_sink.columns[i].name); + } + } return std::make_shared( - _t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs, - _write_output_vexpr_ctxs, _non_write_columns_indices, hive_table_sink.columns, - std::move(write_info), (file_name == nullptr) ? _compute_file_name() : *file_name, - file_name_index, file_format_type, write_compress_type, hive_table_sink.hadoop_config); + _t_sink, std::move(partition_name), update_mode, _write_output_vexpr_ctxs, + std::move(column_names), std::move(write_info), + (file_name == nullptr) ? _compute_file_name() : *file_name, file_name_index, + file_format_type, write_compress_type, hive_table_sink.hadoop_config); } std::vector VHiveTableWriter::_create_partition_values(vectorized::Block& block, diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index 3a3f45a6db1c1d..4989ba443c7e20 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -20,6 +20,7 @@ #include #include "util/runtime_profile.h" +#include "vec/columns/column.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/sink/writer/async_result_writer.h" @@ -62,6 +63,9 @@ class VHiveTableWriter final : public AsyncResultWriter { std::string _compute_file_name(); + Status _filter_block(doris::vectorized::Block& block, const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block); + // Currently it is a copy, maybe it is better to use move semantics to eliminate it. TDataSink _t_sink; RuntimeState* _state = nullptr; diff --git a/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out b/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out index 1f7d39971db63a..20d2758c2a64e3 100644 --- a/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out +++ b/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out @@ -8,6 +8,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -17,6 +23,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -26,6 +38,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -34,3 +52,10 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d -- !complex_type02 -- a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} + +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + diff --git a/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy b/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy index cf3bd2a9037a48..0e1e1355afd3e0 100644 --- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy +++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy @@ -266,6 +266,62 @@ suite("test_hive_write_type", "p0,external,hive,external_docker,external_docker_ sql """ DROP DATABASE ${catalog_name}.test_hive_ex """ } + def test_columns_out_of_order = { String file_format, String catalog_name -> + sql """ switch ${catalog_name} """ + sql """ create database if not exists `test_columns_out_of_order` """; + sql """ use `${catalog_name}`.`test_columns_out_of_order` """ + + sql """ drop table if exists columns_out_of_order_source_tbl_${file_format} """ + sql """ + CREATE TABLE columns_out_of_order_source_tbl_${file_format} ( + `col3` bigint, + `col6` int, + `col1` bigint, + `col4` int, + `col2` bigint, + `col5` int + ) ENGINE = hive + PROPERTIES ( + 'file_format'='${file_format}' + ) + """; + sql """ drop table if exists columns_out_of_order_target_tbl_${file_format} """ + sql """ + CREATE TABLE columns_out_of_order_target_tbl_${file_format} ( + `col1` bigint, + `col2` bigint, + `col3` bigint, + `col4` int, + `col5` int, + `col6` int + ) ENGINE = hive PARTITION BY LIST ( + col4, col5, col6 + )() + PROPERTIES ( + 'file_format'='${file_format}' + ) + """; + + sql """ + INSERT INTO columns_out_of_order_source_tbl_${file_format} ( + col1, col2, col3, col4, col5, col6 + ) VALUES (1, 2, 3, 4, 5, 6); + """ + order_qt_columns_out_of_order01 """ SELECT * FROM columns_out_of_order_source_tbl_${file_format} """ + + sql """ + INSERT INTO columns_out_of_order_target_tbl_${file_format} ( + col1, col2, col3, col4, col5, col6 + ) VALUES (1, 2, 3, 4, 5, 6); + """ + + order_qt_columns_out_of_order02 """ SELECT * FROM columns_out_of_order_target_tbl_${file_format} """ + + sql """ drop table columns_out_of_order_source_tbl_${file_format} """ + sql """ drop table columns_out_of_order_target_tbl_${file_format} """ + sql """ drop database if exists `test_columns_out_of_order` """; + } + try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") @@ -285,6 +341,7 @@ suite("test_hive_write_type", "p0,external,hive,external_docker,external_docker_ logger.info("Process file format" + file_format) test_complex_type_tbl(file_format, catalog_name) test_insert_exception(file_format, catalog_name) + test_columns_out_of_order(file_format, catalog_name) } sql """drop catalog if exists ${catalog_name}""" } finally {