Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 15 additions & 54 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@
namespace doris {
namespace vectorized {

VHivePartitionWriter::VHivePartitionWriter(
const TDataSink& t_sink, std::string partition_name, TUpdateMode::type update_mode,
const VExprContextSPtrs& output_expr_ctxs, const VExprContextSPtrs& write_output_expr_ctxs,
const std::set<size_t>& non_write_columns_indices, const std::vector<THiveColumn>& columns,
WriteInfo write_info, std::string file_name, int file_name_index,
TFileFormatType::type file_format_type, TFileCompressType::type hive_compress_type,
const std::map<std::string, std::string>& hadoop_conf)
VHivePartitionWriter::VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
TUpdateMode::type update_mode,
const VExprContextSPtrs& write_output_expr_ctxs,
std::vector<std::string> write_column_names,
WriteInfo write_info, std::string file_name,
int file_name_index,
TFileFormatType::type file_format_type,
TFileCompressType::type hive_compress_type,
const std::map<std::string, std::string>& hadoop_conf)
: _partition_name(std::move(partition_name)),
_update_mode(update_mode),
_vec_output_expr_ctxs(output_expr_ctxs),
_write_output_expr_ctxs(write_output_expr_ctxs),
_non_write_columns_indices(non_write_columns_indices),
_columns(columns),
_write_column_names(std::move(write_column_names)),
_write_info(std::move(write_info)),
_file_name(std::move(file_name)),
_file_name_index(file_name_index),
Expand Down Expand Up @@ -74,14 +74,6 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options));

std::vector<std::string> column_names;
column_names.reserve(_columns.size());
for (int i = 0; i < _columns.size(); i++) {
if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) {
column_names.emplace_back(_columns[i].name);
}
}

switch (_file_format_type) {
case TFileFormatType::FORMAT_PARQUET: {
bool parquet_disable_dictionary = false;
Expand All @@ -105,7 +97,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
}
}
_file_format_transformer.reset(new VParquetTransformer(
state, _file_writer.get(), _write_output_expr_ctxs, std::move(column_names),
state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names,
parquet_compression_type, parquet_disable_dictionary, TParquetVersion::PARQUET_1_0,
false));
return _file_format_transformer->open();
Expand Down Expand Up @@ -136,7 +128,7 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)

_file_format_transformer.reset(
new VOrcTransformer(state, _file_writer.get(), _write_output_expr_ctxs,
std::move(column_names), false, orc_compression_type));
_write_column_names, false, orc_compression_type));
return _file_format_transformer->open();
}
default: {
Expand Down Expand Up @@ -165,43 +157,12 @@ Status VHivePartitionWriter::close(const Status& status) {
return Status::OK();
}

Status VHivePartitionWriter::write(vectorized::Block& block, vectorized::IColumn::Filter* filter) {
Block output_block;
RETURN_IF_ERROR(_projection_and_filter_block(block, filter, &output_block));
RETURN_IF_ERROR(_file_format_transformer->write(output_block));
_row_count += output_block.rows();
Status VHivePartitionWriter::write(vectorized::Block& block) {
RETURN_IF_ERROR(_file_format_transformer->write(block));
_row_count += block.rows();
return Status::OK();
}

Status VHivePartitionWriter::_projection_and_filter_block(doris::vectorized::Block& input_block,
const vectorized::IColumn::Filter* filter,
doris::vectorized::Block* output_block) {
Status status = Status::OK();
if (input_block.rows() == 0) {
return status;
}
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
_vec_output_expr_ctxs, input_block, output_block, true));
materialize_block_inplace(*output_block);

if (filter == nullptr) {
return status;
}

std::vector<uint32_t> columns_to_filter;
int column_to_keep = input_block.columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}

Block::filter_block_internal(output_block, columns_to_filter, *filter);

output_block->erase(_non_write_columns_indices);

return status;
}

THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
THivePartitionUpdate hive_partition_update;
hive_partition_update.__set_name(_partition_name);
Expand Down
16 changes: 5 additions & 11 deletions be/src/vec/sink/writer/vhive_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@ class VHivePartitionWriter {
};

VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
TUpdateMode::type update_mode, const VExprContextSPtrs& output_expr_ctxs,
TUpdateMode::type update_mode,
const VExprContextSPtrs& write_output_expr_ctxs,
const std::set<size_t>& non_write_columns_indices,
const std::vector<THiveColumn>& columns, WriteInfo write_info,
std::vector<std::string> write_column_names, WriteInfo write_info,
std::string file_name, int file_name_index,
TFileFormatType::type file_format_type,
TFileCompressType::type hive_compress_type,
Expand All @@ -62,7 +61,7 @@ class VHivePartitionWriter {

Status open(RuntimeState* state, RuntimeProfile* profile);

Status write(vectorized::Block& block, IColumn::Filter* filter = nullptr);
Status write(vectorized::Block& block);

Status close(const Status& status);

Expand All @@ -76,10 +75,6 @@ class VHivePartitionWriter {
std::string _get_target_file_name();

private:
Status _projection_and_filter_block(doris::vectorized::Block& input_block,
const vectorized::IColumn::Filter* filter,
doris::vectorized::Block* output_block);

THivePartitionUpdate _build_partition_update();

std::string _get_file_extension(TFileFormatType::type file_format_type,
Expand All @@ -93,11 +88,10 @@ class VHivePartitionWriter {

size_t _row_count = 0;

const VExprContextSPtrs& _vec_output_expr_ctxs;
const VExprContextSPtrs& _write_output_expr_ctxs;
const std::set<size_t>& _non_write_columns_indices;

const std::vector<THiveColumn>& _columns;
std::vector<std::string> _write_column_names;

WriteInfo _write_info;
std::string _file_name;
int _file_name_index;
Expand Down
75 changes: 60 additions & 15 deletions be/src/vec/sink/writer/vhive_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "runtime/runtime_state.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/materialize_block.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/sink/writer/vhive_partition_writer.h"
Expand Down Expand Up @@ -82,8 +83,17 @@ Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {

Status VHiveTableWriter::write(vectorized::Block& block) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'write' has cognitive complexity of 83 (threshold 50) [readability-function-cognitive-complexity]

Status VHiveTableWriter::write(vectorized::Block& block) {
                         ^
Additional context

be/src/vec/sink/writer/vhive_table_writer.cpp:85: +1, including nesting penalty of 0, nesting level increased to 1

    if (block.rows() == 0) {
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:89: +1, including nesting penalty of 0, nesting level increased to 1

    RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
    ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:89: +2, including nesting penalty of 1, nesting level increased to 2

    RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
    ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:97: +1, including nesting penalty of 0, nesting level increased to 1

    if (_partition_columns_input_index.empty()) {
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:102: +2, including nesting penalty of 1, nesting level increased to 2

            if (writer_iter == _partitions_to_writers.end()) {
            ^

be/src/vec/sink/writer/vhive_table_writer.cpp:105: +3, including nesting penalty of 2, nesting level increased to 3

                } catch (doris::Exception& e) {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:109: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(writer->open(_state, _profile));
                ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:109: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(writer->open(_state, _profile));
                ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:110: +1, nesting level increased to 2

            } else {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:111: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:122: +4, including nesting penalty of 3, nesting level increased to 4

                    } catch (doris::Exception& e) {
                      ^

be/src/vec/sink/writer/vhive_table_writer.cpp:126: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:126: +5, including nesting penalty of 4, nesting level increased to 5

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:127: +1, nesting level increased to 3

                } else {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:134: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(writer->write(output_block));
        ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:134: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(writer->write(output_block));
        ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:140: +1, including nesting penalty of 0, nesting level increased to 1

        for (int i = 0; i < output_block.rows(); ++i) {
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:144: +2, including nesting penalty of 1, nesting level increased to 2

            } catch (doris::Exception& e) {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:151: nesting level increased to 2

                    [&](const std::string& partition_name, int position,
                    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:157: +3, including nesting penalty of 2, nesting level increased to 3

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:157: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:163: +3, including nesting penalty of 2, nesting level increased to 3

                } catch (doris::Exception& e) {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:170: +2, including nesting penalty of 1, nesting level increased to 2

            if (writer_iter == _partitions_to_writers.end()) {
            ^

be/src/vec/sink/writer/vhive_table_writer.cpp:172: +3, including nesting penalty of 2, nesting level increased to 3

                if (_partitions_to_writers.size() + 1 >
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:178: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
                ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:178: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
                ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:179: +1, nesting level increased to 2

            } else {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:181: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:190: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
                    ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:190: +5, including nesting penalty of 4, nesting level increased to 5

                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
                    ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:192: +1, nesting level increased to 3

                } else {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:196: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_pos_iter == writer_positions.end()) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:200: +1, nesting level increased to 3

                } else {
                  ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'write' has cognitive complexity of 83 (threshold 50) [readability-function-cognitive-complexity]

Status VHiveTableWriter::write(vectorized::Block& block) {
                         ^
Additional context

be/src/vec/sink/writer/vhive_table_writer.cpp:86: +1, including nesting penalty of 0, nesting level increased to 1

    if (block.rows() == 0) {
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:90: +1, including nesting penalty of 0, nesting level increased to 1

    RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
    ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:90: +2, including nesting penalty of 1, nesting level increased to 2

    RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
    ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:98: +1, including nesting penalty of 0, nesting level increased to 1

    if (_partition_columns_input_index.empty()) {
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:103: +2, including nesting penalty of 1, nesting level increased to 2

            if (writer_iter == _partitions_to_writers.end()) {
            ^

be/src/vec/sink/writer/vhive_table_writer.cpp:106: +3, including nesting penalty of 2, nesting level increased to 3

                } catch (doris::Exception& e) {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:110: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(writer->open(_state, _profile));
                ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:110: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(writer->open(_state, _profile));
                ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:111: +1, nesting level increased to 2

            } else {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:112: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:123: +4, including nesting penalty of 3, nesting level increased to 4

                    } catch (doris::Exception& e) {
                      ^

be/src/vec/sink/writer/vhive_table_writer.cpp:127: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:127: +5, including nesting penalty of 4, nesting level increased to 5

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:128: +1, nesting level increased to 3

                } else {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:135: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(writer->write(output_block));
        ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:135: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(writer->write(output_block));
        ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:141: +1, including nesting penalty of 0, nesting level increased to 1

        for (int i = 0; i < output_block.rows(); ++i) {
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:145: +2, including nesting penalty of 1, nesting level increased to 2

            } catch (doris::Exception& e) {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:152: nesting level increased to 2

                    [&](const std::string& partition_name, int position,
                    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:158: +3, including nesting penalty of 2, nesting level increased to 3

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:158: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(writer->open(_state, _profile));
                    ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:164: +3, including nesting penalty of 2, nesting level increased to 3

                } catch (doris::Exception& e) {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:171: +2, including nesting penalty of 1, nesting level increased to 2

            if (writer_iter == _partitions_to_writers.end()) {
            ^

be/src/vec/sink/writer/vhive_table_writer.cpp:173: +3, including nesting penalty of 2, nesting level increased to 3

                if (_partitions_to_writers.size() + 1 >
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:179: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
                ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:179: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
                ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:180: +1, nesting level increased to 2

            } else {
              ^

be/src/vec/sink/writer/vhive_table_writer.cpp:182: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:191: +4, including nesting penalty of 3, nesting level increased to 4

                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
                    ^

be/src/common/status.h:614: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/vec/sink/writer/vhive_table_writer.cpp:191: +5, including nesting penalty of 4, nesting level increased to 5

                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
                    ^

be/src/common/status.h:616: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/vec/sink/writer/vhive_table_writer.cpp:193: +1, nesting level increased to 3

                } else {
                  ^

be/src/vec/sink/writer/vhive_table_writer.cpp:197: +3, including nesting penalty of 2, nesting level increased to 3

                if (writer_pos_iter == writer_positions.end()) {
                ^

be/src/vec/sink/writer/vhive_table_writer.cpp:201: +1, nesting level increased to 3

                } else {
                  ^

SCOPED_RAW_TIMER(&_send_data_ns);

if (block.rows() == 0) {
return Status::OK();
}
Block output_block;
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
_vec_output_expr_ctxs, block, &output_block, false));
materialize_block_inplace(output_block);

std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions;
_row_count += block.rows();
_row_count += output_block.rows();
auto& hive_table_sink = _t_sink.hive_table_sink;

if (_partition_columns_input_index.empty()) {
Expand All @@ -93,7 +103,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) {
auto writer_iter = _partitions_to_writers.find("");
if (writer_iter == _partitions_to_writers.end()) {
try {
writer = _create_partition_writer(block, -1);
writer = _create_partition_writer(output_block, -1);
} catch (doris::Exception& e) {
return e.to_status();
}
Expand All @@ -109,7 +119,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) {
}
_partitions_to_writers.erase(writer_iter);
try {
writer = _create_partition_writer(block, -1, &file_name,
writer = _create_partition_writer(output_block, -1, &file_name,
file_name_index + 1);
} catch (doris::Exception& e) {
return e.to_status();
Expand All @@ -122,16 +132,17 @@ Status VHiveTableWriter::write(vectorized::Block& block) {
}
}
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
RETURN_IF_ERROR(writer->write(block));
output_block.erase(_non_write_columns_indices);
RETURN_IF_ERROR(writer->write(output_block));
return Status::OK();
}

{
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
for (int i = 0; i < block.rows(); ++i) {
for (int i = 0; i < output_block.rows(); ++i) {
std::vector<std::string> partition_values;
try {
partition_values = _create_partition_values(block, i);
partition_values = _create_partition_values(output_block, i);
} catch (doris::Exception& e) {
return e.to_status();
}
Expand All @@ -143,10 +154,10 @@ Status VHiveTableWriter::write(vectorized::Block& block) {
const std::string* file_name, int file_name_index,
std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status {
try {
auto writer =
_create_partition_writer(block, position, file_name, file_name_index);
auto writer = _create_partition_writer(output_block, position, file_name,
file_name_index);
RETURN_IF_ERROR(writer->open(_state, _profile));
IColumn::Filter filter(block.rows(), 0);
IColumn::Filter filter(output_block.rows(), 0);
filter[position] = 1;
writer_positions.insert({writer, std::move(filter)});
_partitions_to_writers.insert({partition_name, writer});
Expand Down Expand Up @@ -185,7 +196,7 @@ Status VHiveTableWriter::write(vectorized::Block& block) {
}
auto writer_pos_iter = writer_positions.find(writer);
if (writer_pos_iter == writer_positions.end()) {
IColumn::Filter filter(block.rows(), 0);
IColumn::Filter filter(output_block.rows(), 0);
filter[i] = 1;
writer_positions.insert({writer, std::move(filter)});
} else {
Expand All @@ -195,12 +206,39 @@ Status VHiveTableWriter::write(vectorized::Block& block) {
}
}
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
output_block.erase(_non_write_columns_indices);
for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) {
RETURN_IF_ERROR(it->first->write(block, &it->second));
Block filtered_block;
RETURN_IF_ERROR(_filter_block(output_block, &it->second, &filtered_block));
RETURN_IF_ERROR(it->first->write(filtered_block));
}
return Status::OK();
}

Status VHiveTableWriter::_filter_block(doris::vectorized::Block& block,
const vectorized::IColumn::Filter* filter,
doris::vectorized::Block* output_block) {
const ColumnsWithTypeAndName& columns_with_type_and_name =
block.get_columns_with_type_and_name();
vectorized::ColumnsWithTypeAndName result_columns;
for (int i = 0; i < columns_with_type_and_name.size(); ++i) {
const auto& col = columns_with_type_and_name[i];
result_columns.emplace_back(col.column->clone_resized(col.column->size()), col.type,
col.name);
}
*output_block = {std::move(result_columns)};

std::vector<uint32_t> columns_to_filter;
int column_to_keep = output_block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}

Block::filter_block_internal(output_block, columns_to_filter, *filter);
return Status::OK();
}

Status VHiveTableWriter::close(Status status) {
int64_t partitions_to_writers_size = _partitions_to_writers.size();
{
Expand Down Expand Up @@ -311,11 +349,18 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
}

_write_file_count++;
std::vector<std::string> column_names;
column_names.reserve(hive_table_sink.columns.size());
for (int i = 0; i < hive_table_sink.columns.size(); i++) {
if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) {
column_names.emplace_back(hive_table_sink.columns[i].name);
}
}
return std::make_shared<VHivePartitionWriter>(
_t_sink, std::move(partition_name), update_mode, _vec_output_expr_ctxs,
_write_output_vexpr_ctxs, _non_write_columns_indices, hive_table_sink.columns,
std::move(write_info), (file_name == nullptr) ? _compute_file_name() : *file_name,
file_name_index, file_format_type, write_compress_type, hive_table_sink.hadoop_config);
_t_sink, std::move(partition_name), update_mode, _write_output_vexpr_ctxs,
std::move(column_names), std::move(write_info),
(file_name == nullptr) ? _compute_file_name() : *file_name, file_name_index,
file_format_type, write_compress_type, hive_table_sink.hadoop_config);
}

std::vector<std::string> VHiveTableWriter::_create_partition_values(vectorized::Block& block,
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/sink/writer/vhive_table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gen_cpp/DataSinks_types.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'gen_cpp/DataSinks_types.h' file not found [clang-diagnostic-error]

#include <gen_cpp/DataSinks_types.h>
         ^


#include "util/runtime_profile.h"
#include "vec/columns/column.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/sink/writer/async_result_writer.h"

Expand Down Expand Up @@ -62,6 +63,9 @@ class VHiveTableWriter final : public AsyncResultWriter {

std::string _compute_file_name();

Status _filter_block(doris::vectorized::Block& block, const vectorized::IColumn::Filter* filter,
doris::vectorized::Block* output_block);

// Currently it is a copy, maybe it is better to use move semantics to eliminate it.
TDataSink _t_sink;
RuntimeState* _state = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d
a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N
a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}}

-- !columns_out_of_order01 --
3 6 1 4 2 5

-- !columns_out_of_order02 --
1 2 3 4 5 6

-- !complex_type01 --
a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N
a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N
Expand All @@ -17,6 +23,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d
a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N
a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}}

-- !columns_out_of_order01 --
3 6 1 4 2 5

-- !columns_out_of_order02 --
1 2 3 4 5 6

-- !complex_type01 --
a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N
a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N
Expand All @@ -26,6 +38,12 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d
a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N
a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}}

-- !columns_out_of_order01 --
3 6 1 4 2 5

-- !columns_out_of_order02 --
1 2 3 4 5 6

-- !complex_type01 --
a \N \N \N \N \N \N \N \N \N ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} \N
a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N
Expand All @@ -34,3 +52,10 @@ a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d
-- !complex_type02 --
a b c d e 1.1 12345 0.12345678 string \N \N \N \N \N \N \N \N \N \N \N
a b c d e 1.1 12345 0.12345678 string [0.001, 0.002] ["char1", "char2"] ["c", "d"] ["string1", "string2"] [{1:"a"}, {2:"b"}] {1234567890123456789:"a"} {1234567890123456789:0.12345678} {"key":["char1", "char2"]} {"id": 1, "gender": 1, "name": "John Doe"} {"scale": 123.4567, "metric": ["metric1", "metric2"]} {"codes": [123, 456], "props": {"key1":["char1", "char2"]}}

-- !columns_out_of_order01 --
3 6 1 4 2 5

-- !columns_out_of_order02 --
1 2 3 4 5 6

Loading