From a22d327880b19adcfe63f7ecb26058f2a24f53c9 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Tue, 28 May 2024 14:09:15 +0800 Subject: [PATCH] [Fix](hive-writer) Fix partition column orders issue when the partition fields inserted into the target table are inconsistent with the field order of the query source table and the schema field order of the query source table. (#35347) Fix partition column orders issue when the partition fields inserted into the target table are inconsistent with the field order of the query source table and the schema field order of the query source table. Please look at `regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy` in the PR. --- .../sink/writer/vhive_partition_writer.cpp | 69 ++++------------- .../vec/sink/writer/vhive_partition_writer.h | 18 ++--- be/src/vec/sink/writer/vhive_table_writer.cpp | 75 +++++++++++++++---- be/src/vec/sink/writer/vhive_table_writer.h | 4 + .../hive/ddl/test_hive_write_type.out | 25 +++++++ .../hive/ddl/test_hive_write_type.groovy | 57 ++++++++++++++ 6 files changed, 167 insertions(+), 81 deletions(-) diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index 4504faf0e9c0dd..189e0211e52162 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -31,19 +31,19 @@ namespace doris { namespace vectorized { -VHivePartitionWriter::VHivePartitionWriter( - const TDataSink& t_sink, std::string partition_name, TUpdateMode::type update_mode, - const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs& write_output_expr_ctxs, - const std::set& non_write_columns_indices, const std::vector& columns, - WriteInfo write_info, std::string file_name, int file_name_index, - TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, - const std::map& hadoop_conf) +VHivePartitionWriter::VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, + TUpdateMode::type update_mode, + const VExprContextSPtrs& write_output_expr_ctxs, + std::vector write_column_names, + WriteInfo write_info, std::string file_name, + int file_name_index, + TFileFormatType::type file_format_type, + TFileCompressType::type hive_compress_type, + const std::map& hadoop_conf) : _partition_name(std::move(partition_name)), _update_mode(update_mode), - _vec_output_expr_ctxs(output_expr_ctxs), _write_output_expr_ctxs(write_output_expr_ctxs), - _non_write_columns_indices(non_write_columns_indices), - _columns(columns), + _write_column_names(std::move(write_column_names)), _write_info(std::move(write_info)), _file_name(std::move(file_name)), _file_name_index(file_name_index), @@ -63,14 +63,6 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()), 0, _file_writer, &file_writer_options)); - std::vector column_names; - column_names.reserve(_columns.size()); - for (int i = 0; i < _columns.size(); i++) { - if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) { - column_names.emplace_back(_columns[i].name); - } - } - switch (_file_format_type) { case TFileFormatType::FORMAT_PARQUET: { bool parquet_disable_dictionary = false; @@ -94,7 +86,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) } } _file_format_transformer.reset(new VParquetTransformer( - state, _file_writer.get(), _write_output_expr_ctxs, std::move(column_names), + state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names, parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0, false)); return _file_format_transformer->open(); @@ -125,7 +117,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) _file_format_transformer.reset( new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs, - std::move(column_names), false, orc_compression_type)); + _write_column_names, false, orc_compression_type)); return _file_format_transformer->open(); } default: { @@ -156,43 +148,12 @@ Status VHivePartitionWriter::close(const Status& status) { return Status::OK(); } -Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) { - Block output_block; - RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block)); - RETURN_IF_ERROR(_file_format_transformer->write(output_block)); - _row_count += output_block.rows(); +Status VHivePartitionWriter::write(vectorized::Block& block) { + RETURN_IF_ERROR(_file_format_transformer->write(block)); + _row_count += block.rows(); return Status::OK(); } -Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block, - const vectorized::IColumn::Filter* filter, - doris::vectorized::Block* output_block) { - Status status = Status::OK(); - if (input_block.rows() == 0) { - return status; - } - RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( - _vec_output_expr_ctxs, input_block, output_block, true)); - materialize_block_inplace(*output_block); - - if (filter == nullptr) { - return status; - } - - std::vector columns_to_filter; - int column_to_keep = input_block.columns(); - columns_to_filter.resize(column_to_keep); - for (uint32_t i = 0; i < column_to_keep; ++i) { - columns_to_filter[i] = i; - } - - Block::filter_block_internal(output_block, columns_to_filter, *filter); - - output_block->erase(_non_write_columns_indices); - - return status; -} - THivePartitionUpdate VHivePartitionWriter::_build_partition_update() { THivePartitionUpdate hive_partition_update; hive_partition_update.__set_name(_partition_name); diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h b/be/src/vec/sink/writer/vhive_partition_writer.h index 912ac8b1e496b2..a3a8cb9c7a35c9 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.h +++ b/be/src/vec/sink/writer/vhive_partition_writer.h @@ -44,11 +44,10 @@ class VHivePartitionWriter { TFileType::type file_type; }; - VHivePartitionWriter(const TDataSink& t_sink, const std::string partition_name, - TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs, + VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, + TUpdateMode::type update_mode, const VExprContextSPtrs& write_output_expr_ctxs, - const std::set& non_write_columns_indices, - const std::vector& columns, WriteInfo write_info, + std::vector write_column_names, WriteInfo write_info, std::string file_name, int file_name_index, TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type, @@ -58,7 +57,7 @@ class VHivePartitionWriter { Status open(RuntimeState* state, RuntimeProfile* profile); - Status write(vectorized::Block& block, IColumn::Filter* filter = nullptr); + Status write(vectorized::Block& block); Status close(const Status& status); @@ -72,10 +71,6 @@ class VHivePartitionWriter { std::string _get_target_file_name(); private: - Status _projection_and_filter_block(doris::vectorized::Block& input_block, - const vectorized::IColumn::Filter* filter, - doris::vectorized::Block* output_block); - THivePartitionUpdate _build_partition_update(); std::string _get_file_extension(TFileFormatType::type file_format_type, @@ -89,11 +84,10 @@ class VHivePartitionWriter { size_t _row_count = 0; - const VExprContextSPtrs& _vec_output_expr_ctxs; const VExprContextSPtrs& _write_output_expr_ctxs; - const std::set& _non_write_columns_indices; - const std::vector& _columns; + std::vector _write_column_names; + WriteInfo _write_info; std::string _file_name; int _file_name_index; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index 76f16e3daaa4d4..0e64060eb0baae 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -20,6 +20,7 @@ #include "runtime/runtime_state.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" +#include "vec/core/materialize_block.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/sink/writer/vhive_partition_writer.h" @@ -82,8 +83,17 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { Status VHiveTableWriter::write(vectorized::Block& block) { SCOPED_RAW_TIMER(&_send_data_ns); + + if (block.rows() == 0) { + return Status::OK(); + } + Block output_block; + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + _vec_output_expr_ctxs, block, &output_block, false)); + materialize_block_inplace(output_block); + std::unordered_map, IColumn::Filter> writer_positions; - _row_count += block.rows(); + _row_count += output_block.rows(); auto& hive_table_sink = _t_sink.hive_table_sink; if (_partition_columns_input_index.empty()) { @@ -93,7 +103,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { auto writer_iter = _partitions_to_writers.find(""); if (writer_iter == _partitions_to_writers.end()) { try { - writer = _create_partition_writer(block, -1); + writer = _create_partition_writer(output_block, -1); } catch (doris::Exception& e) { return e.to_status(); } @@ -109,7 +119,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } _partitions_to_writers.erase(writer_iter); try { - writer = _create_partition_writer(block, -1, &file_name, + writer = _create_partition_writer(output_block, -1, &file_name, file_name_index + 1); } catch (doris::Exception& e) { return e.to_status(); @@ -122,16 +132,17 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } } SCOPED_RAW_TIMER(&_partition_writers_write_ns); - RETURN_IF_ERROR(writer->write(block)); + output_block.erase(_non_write_columns_indices); + RETURN_IF_ERROR(writer->write(output_block)); return Status::OK(); } { SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); - for (int i = 0; i < block.rows(); ++i) { + for (int i = 0; i < output_block.rows(); ++i) { std::vector partition_values; try { - partition_values = _create_partition_values(block, i); + partition_values = _create_partition_values(output_block, i); } catch (doris::Exception& e) { return e.to_status(); } @@ -143,10 +154,10 @@ Status VHiveTableWriter::write(vectorized::Block& block) { const std::string* file_name, int file_name_index, std::shared_ptr& writer_ptr) -> Status { try { - auto writer = - _create_partition_writer(block, position, file_name, file_name_index); + auto writer = _create_partition_writer(output_block, position, file_name, + file_name_index); RETURN_IF_ERROR(writer->open(_state, _profile)); - IColumn::Filter filter(block.rows(), 0); + IColumn::Filter filter(output_block.rows(), 0); filter[position] = 1; writer_positions.insert({writer, std::move(filter)}); _partitions_to_writers.insert({partition_name, writer}); @@ -185,7 +196,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } auto writer_pos_iter = writer_positions.find(writer); if (writer_pos_iter == writer_positions.end()) { - IColumn::Filter filter(block.rows(), 0); + IColumn::Filter filter(output_block.rows(), 0); filter[i] = 1; writer_positions.insert({writer, std::move(filter)}); } else { @@ -195,12 +206,39 @@ Status VHiveTableWriter::write(vectorized::Block& block) { } } SCOPED_RAW_TIMER(&_partition_writers_write_ns); + output_block.erase(_non_write_columns_indices); for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { - RETURN_IF_ERROR(it->first->write(block, &it->second)); + Block filtered_block; + RETURN_IF_ERROR(_filter_block(output_block, &it->second, &filtered_block)); + RETURN_IF_ERROR(it->first->write(filtered_block)); } return Status::OK(); } +Status VHiveTableWriter::_filter_block(doris::vectorized::Block& block, + const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block) { + const ColumnsWithTypeAndName& columns_with_type_and_name = + block.get_columns_with_type_and_name(); + vectorized::ColumnsWithTypeAndName result_columns; + for (int i = 0; i < columns_with_type_and_name.size(); ++i) { + const auto& col = columns_with_type_and_name[i]; + result_columns.emplace_back(col.column->clone_resized(col.column->size()), col.type, + col.name); + } + *output_block = {std::move(result_columns)}; + + std::vector columns_to_filter; + int column_to_keep = output_block->columns(); + columns_to_filter.resize(column_to_keep); + for (uint32_t i = 0; i < column_to_keep; ++i) { + columns_to_filter[i] = i; + } + + Block::filter_block_internal(output_block, columns_to_filter, *filter); + return Status::OK(); +} + Status VHiveTableWriter::close(Status status) { int64_t partitions_to_writers_size = _partitions_to_writers.size(); { @@ -312,11 +350,18 @@ std::shared_ptr VHiveTableWriter::_create_partition_writer } _write_file_count++; + std::vector column_names; + column_names.reserve(hive_table_sink.columns.size()); + for (int i = 0; i < hive_table_sink.columns.size(); i++) { + if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) { + column_names.emplace_back(hive_table_sink.columns[i].name); + } + } return std::make_shared( - _t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs, - _write_output_vexpr_ctxs, _non_write_columns_indices, hive_table_sink.columns, - std::move(write_info), (file_name == nullptr) ? _compute_file_name() : *file_name, - file_name_index, file_format_type, write_compress_type, hive_table_sink.hadoop_config); + _t_sink, std::move(partition_name), update_mode, _write_output_vexpr_ctxs, + std::move(column_names), std::move(write_info), + (file_name == nullptr) ? _compute_file_name() : *file_name, file_name_index, + file_format_type, write_compress_type, hive_table_sink.hadoop_config); } std::vector VHiveTableWriter::_create_partition_values(vectorized::Block& block, diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index 3a3f45a6db1c1d..4989ba443c7e20 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -20,6 +20,7 @@ #include #include "util/runtime_profile.h" +#include "vec/columns/column.h" #include "vec/exprs/vexpr_fwd.h" #include "vec/sink/writer/async_result_writer.h" @@ -62,6 +63,9 @@ class VHiveTableWriter final : public AsyncResultWriter { std::string _compute_file_name(); + Status _filter_block(doris::vectorized::Block& block, const vectorized::IColumn::Filter* filter, + doris::vectorized::Block* output_block); + // Currently it is a copy, maybe it is better to use move semantics to eliminate it. TDataSink _t_sink; RuntimeState* _state = nullptr; diff --git a/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out b/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out index 1f7d39971db63a..20d2758c2a64e3 100644 --- a/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out +++ b/regression-test/data/external_table_p0/hive/ddl/test_hive_write_type.out @@ -8,6 +8,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -17,6 +23,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -26,6 +38,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + -- !complex_type01 -- a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N @@ -34,3 +52,10 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d -- !complex_type02 -- a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}} + +-- !columns_out_of_order01 -- +3 6 1 4 2 5 + +-- !columns_out_of_order02 -- +1 2 3 4 5 6 + diff --git a/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy b/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy index cf3bd2a9037a48..0e1e1355afd3e0 100644 --- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy +++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_write_type.groovy @@ -266,6 +266,62 @@ suite("test_hive_write_type", "p0,external,hive,external_docker,external_docker_ sql """ DROP DATABASE ${catalog_name}.test_hive_ex """ } + def test_columns_out_of_order = { String file_format, String catalog_name -> + sql """ switch ${catalog_name} """ + sql """ create database if not exists `test_columns_out_of_order` """; + sql """ use `${catalog_name}`.`test_columns_out_of_order` """ + + sql """ drop table if exists columns_out_of_order_source_tbl_${file_format} """ + sql """ + CREATE TABLE columns_out_of_order_source_tbl_${file_format} ( + `col3` bigint, + `col6` int, + `col1` bigint, + `col4` int, + `col2` bigint, + `col5` int + ) ENGINE = hive + PROPERTIES ( + 'file_format'='${file_format}' + ) + """; + sql """ drop table if exists columns_out_of_order_target_tbl_${file_format} """ + sql """ + CREATE TABLE columns_out_of_order_target_tbl_${file_format} ( + `col1` bigint, + `col2` bigint, + `col3` bigint, + `col4` int, + `col5` int, + `col6` int + ) ENGINE = hive PARTITION BY LIST ( + col4, col5, col6 + )() + PROPERTIES ( + 'file_format'='${file_format}' + ) + """; + + sql """ + INSERT INTO columns_out_of_order_source_tbl_${file_format} ( + col1, col2, col3, col4, col5, col6 + ) VALUES (1, 2, 3, 4, 5, 6); + """ + order_qt_columns_out_of_order01 """ SELECT * FROM columns_out_of_order_source_tbl_${file_format} """ + + sql """ + INSERT INTO columns_out_of_order_target_tbl_${file_format} ( + col1, col2, col3, col4, col5, col6 + ) VALUES (1, 2, 3, 4, 5, 6); + """ + + order_qt_columns_out_of_order02 """ SELECT * FROM columns_out_of_order_target_tbl_${file_format} """ + + sql """ drop table columns_out_of_order_source_tbl_${file_format} """ + sql """ drop table columns_out_of_order_target_tbl_${file_format} """ + sql """ drop database if exists `test_columns_out_of_order` """; + } + try { String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") @@ -285,6 +341,7 @@ suite("test_hive_write_type", "p0,external,hive,external_docker,external_docker_ logger.info("Process file format" + file_format) test_complex_type_tbl(file_format, catalog_name) test_insert_exception(file_format, catalog_name) + test_columns_out_of_order(file_format, catalog_name) } sql """drop catalog if exists ${catalog_name}""" } finally {