diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 7dea5d99617416..b7666bda68874a 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -76,13 +76,8 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx) - : TableFormatReader(std::move(file_format_reader)), - _profile(profile), - _state(state), - _params(params), - _range(range), - _kv_cache(kv_cache), - _io_ctx(io_ctx) { + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx), + _kv_cache(kv_cache) { static const char* iceberg_profile = "IcebergProfile"; ADD_TIMER(_profile, iceberg_profile); _iceberg_profile.num_delete_files = @@ -93,31 +88,9 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile); _iceberg_profile.delete_rows_sort_time = ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); - if (range.table_format_params.iceberg_params.__isset.row_count) { - _remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count; - } else { - _remaining_table_level_row_count = -1; - } } -Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - // already get rows from be - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { - auto rows = std::min(_remaining_table_level_row_count, - (int64_t)_state->query_options().batch_size); - _remaining_table_level_row_count -= rows; - auto mutate_columns = block->mutate_columns(); - for (auto& col : mutate_columns) { - col->resize(rows); - } - block->set_columns(std::move(mutate_columns)); - *read_rows = rows; - if (_remaining_table_level_row_count == 0) { - *eof = true; - } - - return Status::OK(); - } +Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { RETURN_IF_ERROR(_expand_block_if_need(block)); // To support iceberg schema evolution. We change the column name in block to @@ -160,14 +133,14 @@ Status IcebergTableReader::get_columns( return _file_format_reader->get_columns(name_to_type, missing_cols); } -Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) { +Status IcebergTableReader::init_row_filters() { // We get the count value by doris's be, so we don't need to read the delete file - if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) { return Status::OK(); } - auto& table_desc = range.table_format_params.iceberg_params; - auto& version = table_desc.format_version; + const auto& table_desc = _range.table_format_params.iceberg_params; + const auto& version = table_desc.format_version; if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { return Status::OK(); } @@ -547,7 +520,7 @@ Status IcebergParquetReader::init_reader( _gen_new_colname_to_value_range(); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); parquet_reader->iceberg_sanitize(_all_required_col_names); - RETURN_IF_ERROR(init_row_filters(_range, _io_ctx)); + RETURN_IF_ERROR(init_row_filters()); return parquet_reader->init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, @@ -619,7 +592,7 @@ Status IcebergOrcReader::init_reader( _gen_file_col_names(); _gen_new_colname_to_value_range(); orc_reader->set_table_col_to_file_col(_table_col_to_file_col); - RETURN_IF_ERROR(init_row_filters(_range, _io_ctx)); + RETURN_IF_ERROR(init_row_filters()); return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range, conjuncts, false, tuple_descriptor, row_descriptor, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index b057cb0657aa24..48236021ce9432 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -30,10 +30,8 @@ #include "exec/olap_common.h" #include "runtime/define_primitive_type.h" #include "runtime/primitive_type.h" -#include "runtime/runtime_state.h" #include "runtime/types.h" #include "table_format_reader.h" -#include "util/runtime_profile.h" #include "vec/columns/column_dictionary.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" @@ -80,9 +78,9 @@ class IcebergTableReader : public TableFormatReader { io::IOContext* io_ctx); ~IcebergTableReader() override = default; - Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final; + Status init_row_filters() final; - Status get_next_block(Block* block, size_t* read_rows, bool* eof) final; + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) final; @@ -136,10 +134,6 @@ class IcebergTableReader : public TableFormatReader { // Remove the added delete columns Status _shrink_block_if_need(Block* block); - RuntimeProfile* _profile; - RuntimeState* _state; - const TFileScanRangeParams& _params; - const TFileRangeDesc& _range; // owned by scan node ShardedKVCache* _kv_cache; IcebergProfile _iceberg_profile; @@ -163,13 +157,9 @@ class IcebergTableReader : public TableFormatReader { std::vector _expand_col_names; std::vector _expand_columns; - io::IOContext* _io_ctx; bool _has_schema_change = false; bool _has_iceberg_schema = false; - // the table level row count for optimizing query like: - // select count(*) from table; - int64_t _remaining_table_level_row_count; Fileformat _file_format = Fileformat::NONE; const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2; @@ -213,9 +203,9 @@ class IcebergParquetReader final : public IcebergTableReader { const std::unordered_map* slot_id_to_filter_conjuncts); Status _read_position_delete_file(const TFileRangeDesc* delete_range, - DeleteFile* position_delete) override; + DeleteFile* position_delete) final; - void set_delete_rows() override { + void set_delete_rows() final { auto* parquet_reader = (ParquetReader*)(_file_format_reader.get()); parquet_reader->set_delete_rows(&_iceberg_delete_rows); } @@ -224,7 +214,7 @@ class IcebergParquetReader final : public IcebergTableReader { protected: std::unique_ptr _create_equality_reader( - const TFileRangeDesc& delete_desc) override { + const TFileRangeDesc& delete_desc) final { return ParquetReader::create_unique( _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE, const_cast(&_state->timezone_obj()), _io_ctx, _state); @@ -235,7 +225,7 @@ class IcebergOrcReader final : public IcebergTableReader { ENABLE_FACTORY_CREATOR(IcebergOrcReader); Status _read_position_delete_file(const TFileRangeDesc* delete_range, - DeleteFile* position_delete) override; + DeleteFile* position_delete) final; IcebergOrcReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, @@ -243,7 +233,7 @@ class IcebergOrcReader final : public IcebergTableReader { : IcebergTableReader(std::move(file_format_reader), profile, state, params, range, kv_cache, io_ctx) {} - void set_delete_rows() override { + void set_delete_rows() final { auto* orc_reader = (OrcReader*)_file_format_reader.get(); orc_reader->set_position_delete_rowids(&_iceberg_delete_rows); } diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp b/be/src/vec/exec/format/table/paimon_jni_reader.cpp index 71bb496d30183d..e5d997e281a061 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp @@ -21,9 +21,9 @@ #include #include "runtime/descriptors.h" +#include "runtime/runtime_state.h" #include "runtime/types.h" #include "vec/core/types.h" - namespace doris { class RuntimeProfile; class RuntimeState; @@ -65,6 +65,11 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d if (range_params->__isset.serialized_table) { params["serialized_table"] = range_params->serialized_table; } + if (range.table_format_params.__isset.table_level_row_count) { + _remaining_table_level_row_count = range.table_format_params.table_level_row_count; + } else { + _remaining_table_level_row_count = -1; + } // Used to create paimon option for (auto& kv : range.table_format_params.paimon_params.paimon_options) { @@ -81,6 +86,22 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d } Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) { + auto rows = std::min(_remaining_table_level_row_count, + (int64_t)_state->query_options().batch_size); + _remaining_table_level_row_count -= rows; + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { + col->resize(rows); + } + block->set_columns(std::move(mutate_columns)); + *read_rows = rows; + if (_remaining_table_level_row_count == 0) { + *eof = true; + } + + return Status::OK(); + } return _jni_connector->get_next_block(block, read_rows, eof); } diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h b/be/src/vec/exec/format/table/paimon_jni_reader.h index 3ac2229e655c4a..6ed9a57f62e177 100644 --- a/be/src/vec/exec/format/table/paimon_jni_reader.h +++ b/be/src/vec/exec/format/table/paimon_jni_reader.h @@ -68,6 +68,7 @@ class PaimonJniReader : public JniReader { private: const std::unordered_map* _colname_to_value_range; + int64_t _remaining_table_level_row_count; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp b/be/src/vec/exec/format/table/paimon_reader.cpp index 055d6179b2c422..cd8b7c0060f6d8 100644 --- a/be/src/vec/exec/format/table/paimon_reader.cpp +++ b/be/src/vec/exec/format/table/paimon_reader.cpp @@ -20,12 +20,15 @@ #include #include "common/status.h" +#include "runtime/runtime_state.h" #include "util/deletion_vector.h" namespace doris::vectorized { PaimonReader::PaimonReader(std::unique_ptr file_format_reader, - RuntimeProfile* profile, const TFileScanRangeParams& params) - : TableFormatReader(std::move(file_format_reader)), _profile(profile), _params(params) { + RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + io::IOContext* io_ctx) + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx) { static const char* paimon_profile = "PaimonProfile"; ADD_TIMER(_profile, paimon_profile); _paimon_profile.num_delete_rows = @@ -34,15 +37,18 @@ PaimonReader::PaimonReader(std::unique_ptr file_format_reader, ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile); } -Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) { - const auto& table_desc = range.table_format_params.paimon_params; +Status PaimonReader::init_row_filters() { + const auto& table_desc = _range.table_format_params.paimon_params; if (!table_desc.__isset.deletion_file) { return Status::OK(); } // set push down agg type to NONE because we can not do count push down opt // if there are delete files. - _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); + if (!_range.table_format_params.paimon_params.__isset.row_count) { + _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE); + } + const auto& deletion_file = table_desc.deletion_file; io::FileSystemProperties properties = { .system_type = _params.file_type, @@ -50,9 +56,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext .hdfs_params = _params.hdfs_params, .broker_addresses {}, }; - if (range.__isset.file_type) { + if (_range.__isset.file_type) { // for compatibility - properties.system_type = range.file_type; + properties.system_type = _range.file_type; } if (_params.__isset.broker_addresses) { properties.broker_addresses.assign(_params.broker_addresses.begin(), @@ -63,7 +69,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext .path = deletion_file.path, .file_size = -1, .mtime = 0, - .fs_name = range.fs_name, + .fs_name = _range.fs_name, }; // TODO: cache the file in local @@ -77,7 +83,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext { SCOPED_TIMER(_paimon_profile.delete_files_read_time); RETURN_IF_ERROR( - delete_file_reader->read_at(deletion_file.offset, result, &bytes_read, io_ctx)); + delete_file_reader->read_at(deletion_file.offset, result, &bytes_read, _io_ctx)); } if (bytes_read != deletion_file.length + 4) { return Status::IOError( @@ -98,4 +104,8 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext } return Status::OK(); } + +Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { + return _file_format_reader->get_next_block(block, read_rows, eof); +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/paimon_reader.h b/be/src/vec/exec/format/table/paimon_reader.h index 3d82b7a3b5c416..dcc7bf9d700ff6 100644 --- a/be/src/vec/exec/format/table/paimon_reader.h +++ b/be/src/vec/exec/format/table/paimon_reader.h @@ -28,10 +28,13 @@ namespace doris::vectorized { class PaimonReader : public TableFormatReader { public: PaimonReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, - const TFileScanRangeParams& params); + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx); ~PaimonReader() override = default; - Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final; + Status init_row_filters() final; + + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; protected: struct PaimonProfile { @@ -39,23 +42,21 @@ class PaimonReader : public TableFormatReader { RuntimeProfile::Counter* delete_files_read_time; }; std::vector _delete_rows; - RuntimeProfile* _profile; PaimonProfile _paimon_profile; - virtual void set_delete_rows() = 0; -private: - const TFileScanRangeParams& _params; + virtual void set_delete_rows() = 0; }; class PaimonOrcReader final : public PaimonReader { public: ENABLE_FACTORY_CREATOR(PaimonOrcReader); PaimonOrcReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, - const TFileScanRangeParams& params) - : PaimonReader(std::move(file_format_reader), profile, params) {}; + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : PaimonReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {}; ~PaimonOrcReader() final = default; - void set_delete_rows() override { + void set_delete_rows() final { (reinterpret_cast(_file_format_reader.get())) ->set_position_delete_rowids(&_delete_rows); } @@ -65,11 +66,12 @@ class PaimonParquetReader final : public PaimonReader { public: ENABLE_FACTORY_CREATOR(PaimonParquetReader); PaimonParquetReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, - const TFileScanRangeParams& params) - : PaimonReader(std::move(file_format_reader), profile, params) {}; + RuntimeState* state, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : PaimonReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {}; ~PaimonParquetReader() final = default; - void set_delete_rows() override { + void set_delete_rows() final { (reinterpret_cast(_file_format_reader.get())) ->set_delete_rows(&_delete_rows); } diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp b/be/src/vec/exec/format/table/table_format_reader.cpp deleted file mode 100644 index ea8111d81b3d04..00000000000000 --- a/be/src/vec/exec/format/table/table_format_reader.cpp +++ /dev/null @@ -1,25 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "table_format_reader.h" - -namespace doris::vectorized { - -TableFormatReader::TableFormatReader(std::unique_ptr file_format_reader) - : _file_format_reader(std::move(file_format_reader)) {} - -} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h index 5a102a7665e8f1..0257a94a09b79a 100644 --- a/be/src/vec/exec/format/table/table_format_reader.h +++ b/be/src/vec/exec/format/table/table_format_reader.h @@ -17,14 +17,14 @@ #pragma once -#include - -#include +#include +#include #include -#include -#include #include "common/status.h" +#include "runtime/runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" #include "vec/exec/format/generic_reader.h" namespace doris { @@ -40,11 +40,44 @@ namespace doris::vectorized { class TableFormatReader : public GenericReader { public: - TableFormatReader(std::unique_ptr file_format_reader); + TableFormatReader(std::unique_ptr file_format_reader, RuntimeState* state, + RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, io::IOContext* io_ctx) + : _file_format_reader(std::move(file_format_reader)), + _state(state), + _profile(profile), + _params(params), + _range(range), + _io_ctx(io_ctx) { + if (range.table_format_params.__isset.table_level_row_count) { + _table_level_row_count = range.table_format_params.table_level_row_count; + } else { + _table_level_row_count = -1; + } + } ~TableFormatReader() override = default; - Status get_next_block(Block* block, size_t* read_rows, bool* eof) override { - return _file_format_reader->get_next_block(block, read_rows, eof); + Status get_next_block(Block* block, size_t* read_rows, bool* eof) final { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count >= 0) { + auto rows = + std::min(_table_level_row_count, (int64_t)_state->query_options().batch_size); + _table_level_row_count -= rows; + auto mutate_columns = block->mutate_columns(); + for (auto& col : mutate_columns) { + col->resize(rows); + } + block->set_columns(std::move(mutate_columns)); + *read_rows = rows; + if (_table_level_row_count == 0) { + *eof = true; + } + + return Status::OK(); + } + return get_next_block_inner(block, read_rows, eof); } + + virtual Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) = 0; + Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override { return _file_format_reader->get_columns(name_to_type, missing_cols); @@ -64,18 +97,22 @@ class TableFormatReader : public GenericReader { bool fill_all_columns() const override { return _file_format_reader->fill_all_columns(); } - virtual Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) = 0; + virtual Status init_row_filters() = 0; protected: + std::string _table_format; // hudi, iceberg, paimon + std::unique_ptr _file_format_reader; // parquet, orc + RuntimeState* _state = nullptr; // for query options + RuntimeProfile* _profile = nullptr; + const TFileScanRangeParams& _params; + const TFileRangeDesc& _range; + io::IOContext* _io_ctx = nullptr; + int64_t _table_level_row_count = -1; // for optimization of count(*) push down void _collect_profile_before_close() override { if (_file_format_reader != nullptr) { _file_format_reader->collect_profile_before_close(); } } - -protected: - std::string _table_format; // hudi, iceberg - std::unique_ptr _file_format_reader; // parquet, orc }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp index 8be11f6773a445..f1d02c3639926f 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp +++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp @@ -17,7 +17,6 @@ #include "transactional_hive_reader.h" -#include "runtime/runtime_state.h" #include "transactional_hive_common.h" #include "vec/data_types/data_type_factory.hpp" #include "vec/exec/format/orc/vorc_reader.h" @@ -38,12 +37,7 @@ TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, io::IOContext* io_ctx) - : TableFormatReader(std::move(file_format_reader)), - _profile(profile), - _state(state), - _params(params), - _range(range), - _io_ctx(io_ctx) { + : TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx) { static const char* transactional_hive_profile = "TransactionalHiveProfile"; ADD_TIMER(_profile, transactional_hive_profile); _transactional_orc_profile.num_delete_files = @@ -71,7 +65,7 @@ Status TransactionalHiveReader::init_reader( return status; } -Status TransactionalHiveReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { +Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) { for (int i = 0; i < TransactionalHive::READ_PARAMS.size(); ++i) { DataTypePtr data_type = DataTypeFactory::instance().create_data_type( TypeDescriptor(TransactionalHive::READ_PARAMS[i].type), false); @@ -90,8 +84,7 @@ Status TransactionalHiveReader::get_columns( return _file_format_reader->get_columns(name_to_type, missing_cols); } -Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, - io::IOContext* io_ctx) { +Status TransactionalHiveReader::init_row_filters() { std::string data_file_path = _range.path; // the path in _range is remove the namenode prefix, // and the file_path in delete file is full path, so we should add it back. @@ -109,7 +102,8 @@ Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range, std::filesystem::path file_path(data_file_path); SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time); - for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) { + for (const auto& delete_delta : + _range.table_format_params.transactional_hive_params.delete_deltas) { const std::string file_name = file_path.filename().string(); auto iter = std::find(delete_delta.file_names.begin(), delete_delta.file_names.end(), file_name); diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h index 23a691d037b96b..f27f33f45635fc 100644 --- a/be/src/vec/exec/format/table/transactional_hive_reader.h +++ b/be/src/vec/exec/format/table/transactional_hive_reader.h @@ -30,8 +30,6 @@ #include "common/status.h" #include "exec/olap_common.h" #include "table_format_reader.h" -#include "util/runtime_profile.h" -#include "vec/columns/column_dictionary.h" #include "vec/common/hash_table/phmap_fwd_decl.h" namespace doris { @@ -89,12 +87,12 @@ class TransactionalHiveReader : public TableFormatReader { io::IOContext* io_ctx); ~TransactionalHiveReader() override = default; - Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) override; + Status init_row_filters() final; - Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final; Status get_columns(std::unordered_map* name_to_type, - std::unordered_set* missing_cols) override; + std::unordered_set* missing_cols) final; Status init_reader( const std::vector& column_names, @@ -111,16 +109,10 @@ class TransactionalHiveReader : public TableFormatReader { RuntimeProfile::Counter* delete_files_read_time = nullptr; }; - RuntimeProfile* _profile = nullptr; - RuntimeState* _state = nullptr; - const TFileScanRangeParams& _params; - const TFileRangeDesc& _range; TransactionalHiveProfile _transactional_orc_profile; AcidRowIDSet _delete_rows; std::unique_ptr _delete_rows_filter_ptr; std::vector _col_names; - - io::IOContext* _io_ctx = nullptr; }; inline bool operator<(const TransactionalHiveReader::AcidRowID& lhs, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 8a1f64a55b4054..985111cd04051d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -27,7 +27,6 @@ #include #include -#include #include #include #include @@ -970,8 +969,8 @@ Status VFileScanner::_get_next_reader() { &_slot_id_to_filter_conjuncts); std::unique_ptr paimon_reader = PaimonParquetReader::create_unique(std::move(parquet_reader), _profile, - *_params); - RETURN_IF_ERROR(paimon_reader->init_row_filters(range, _io_ctx.get())); + _state, *_params, range, _io_ctx.get()); + RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); } else { bool hive_parquet_use_column_names = true; @@ -1012,7 +1011,7 @@ Status VFileScanner::_get_next_reader() { _file_col_names, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); - RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range, _io_ctx.get())); + RETURN_IF_ERROR(tran_orc_reader->init_row_filters()); _cur_reader = std::move(tran_orc_reader); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { @@ -1032,9 +1031,9 @@ Status VFileScanner::_get_next_reader() { &_file_col_names, _colname_to_value_range, _push_down_conjuncts, false, _real_tuple_desc, _default_val_row_desc.get(), &_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts); - std::unique_ptr paimon_reader = - PaimonOrcReader::create_unique(std::move(orc_reader), _profile, *_params); - RETURN_IF_ERROR(paimon_reader->init_row_filters(range, _io_ctx.get())); + std::unique_ptr paimon_reader = PaimonOrcReader::create_unique( + std::move(orc_reader), _profile, _state, *_params, range, _io_ctx.get()); + RETURN_IF_ERROR(paimon_reader->init_row_filters()); _cur_reader = std::move(paimon_reader); } else { bool hive_orc_use_column_names = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index b7d34312313308..8d3aeaa6a267a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -61,6 +61,8 @@ public abstract class FileScanNode extends ExternalScanNode { // For explain protected long totalFileSize = 0; protected long totalPartitionNum = 0; + // For display pushdown agg result + protected long tableLevelRowCount = -1; public FileScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, StatisticalType statisticalType, boolean needCheckColumnPriv) { @@ -82,11 +84,12 @@ protected void toThrift(TPlanNode planNode) { super.toThrift(planNode); } - public long getPushDownCount() { - // 1. Do not use `0`: If the number of entries in the table is 0, - // it is unclear whether optimization has been performed. - // 2. Do not use `null` or `-`: This makes it easier for the program to parse the `explain` data. - return -1; + protected void setPushDownCount(long count) { + tableLevelRowCount = count; + } + + private long getPushDownCount() { + return tableLevelRowCount; } @Override @@ -106,9 +109,9 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { output.append("(approximate)"); } output.append("inputSplitNum=").append(selectedSplitNum).append(", totalFileSize=") - .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); + .append(totalFileSize).append(", scanRanges=").append(scanRangeLocations.size()).append("\n"); output.append(prefix).append("partition=").append(selectedPartitionNum).append("/").append(totalPartitionNum) - .append("\n"); + .append("\n"); if (detailLevel == TExplainLevel.VERBOSE && !isBatchMode()) { output.append(prefix).append("backends:").append("\n"); @@ -133,25 +136,25 @@ public int compare(TFileRangeDesc o1, TFileRangeDesc o2) { if (size <= 4) { for (TFileRangeDesc file : fileRangeDescs) { output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); } } else { for (int i = 0; i < 3; i++) { TFileRangeDesc file = fileRangeDescs.get(i); output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); } int other = size - 4; output.append(prefix).append(" ... other ").append(other).append(" files ...\n"); TFileRangeDesc file = fileRangeDescs.get(size - 1); output.append(prefix).append(" ").append(file.getPath()) - .append(" start: ").append(file.getStartOffset()) - .append(" length: ").append(file.getSize()) - .append("\n"); + .append(" start: ").append(file.getStartOffset()) + .append(" length: ").append(file.getSize()) + .append("\n"); } } } @@ -182,10 +185,10 @@ public int compare(TFileRangeDesc o1, TFileRangeDesc o2) { } protected void setDefaultValueExprs(TableIf tbl, - Map slotDescByName, - Map exprByName, - TFileScanRangeParams params, - boolean useVarcharAsNull) throws UserException { + Map slotDescByName, + Map exprByName, + TFileScanRangeParams params, + boolean useVarcharAsNull) throws UserException { Preconditions.checkNotNull(tbl); TExpr tExpr = new TExpr(); tExpr.setNodes(Lists.newArrayList()); @@ -222,7 +225,8 @@ protected void setDefaultValueExprs(TableIf tbl, // if slot desc is null, which mean it is an unrelated slot, just skip. // eg: // (a, b, c) set (x=a, y=b, z=c) - // c does not exist in file, the z will be filled with null, even if z has default value. + // c does not exist in file, the z will be filled with null, even if z has + // default value. // and if z is not nullable, the load will fail. if (slotDesc != null) { if (expr != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 9551ea203887d4..34efa59a459e00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -166,12 +166,12 @@ protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) { TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value()); + if (tableLevelPushDownCount) { + tableFormatFileDesc.setTableLevelRowCount(icebergSplit.getTableLevelRowCount()); + } TIcebergFileDesc fileDesc = new TIcebergFileDesc(); fileDesc.setFormatVersion(formatVersion); fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath()); - if (tableLevelPushDownCount) { - fileDesc.setRowCount(icebergSplit.getTableLevelRowCount()); - } if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) { fileDesc.setContent(FileContent.DATA.id()); } else { @@ -382,6 +382,7 @@ private List doGetSplits(int numBackends) throws UserException { break; } } + setPushDownCount(countFromSnapshot); assignCountToSplits(splits, countFromSnapshot); return splits; } else { @@ -563,11 +564,6 @@ protected void toThrift(TPlanNode planNode) { super.toThrift(planNode); } - @Override - public long getPushDownCount() { - return getCountFromSnapshot(); - } - @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { if (pushdownIcebergPredicates.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index a856139ed18296..511c49a8525803 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -105,8 +106,6 @@ public String toString() { private int paimonSplitNum = 0; private List splitStats = new ArrayList<>(); private String serializedTable; - - private boolean pushDownCount = false; private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; public PaimonScanNode(PlanNodeId id, @@ -193,7 +192,8 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) fileDesc.setDbId(((PaimonExternalTable) source.getTargetTable()).getDbId()); fileDesc.setTblId(source.getTargetTable().getId()); fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime()); - // The hadoop conf should be same with PaimonExternalCatalog.createCatalog()#getConfiguration() + // The hadoop conf should be same with + // PaimonExternalCatalog.createCatalog()#getConfiguration() fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties()); Optional optDeletionFile = paimonSplit.getDeletionFile(); if (optDeletionFile.isPresent()) { @@ -208,10 +208,41 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) tDeletionFile.setLength(deletionFile.length()); fileDesc.setDeletionFile(tDeletionFile); } + if (paimonSplit.getRowCount().isPresent()) { + tableFormatFileDesc.setTableLevelRowCount(paimonSplit.getRowCount().get()); + } tableFormatFileDesc.setPaimonParams(fileDesc); rangeDesc.setTableFormatParams(tableFormatFileDesc); } + @VisibleForTesting + public static Optional calcuteTableLevelCount(List paimonSplits) { + // check if all splits don't have deletion vector or cardinality of every + // deletion vector is not null + long totalCount = 0; + long deletionVectorCount = 0; + + for (org.apache.paimon.table.source.Split s : paimonSplits) { + totalCount += s.rowCount(); + + Optional> deletionFiles = s.deletionFiles(); + if (deletionFiles.isPresent()) { + for (DeletionFile dv : deletionFiles.get()) { + if (dv != null) { + Long cardinality = dv.cardinality(); + if (cardinality == null) { + // if there is a null deletion vector, we can't calculate the table level count + return Optional.empty(); + } else { + deletionVectorCount += cardinality; + } + } + } + } + } + return Optional.of(totalCount - deletionVectorCount); + } + @Override public List getSplits(int numBackends) throws UserException { boolean forceJniScanner = sessionVariable.isForceJniScanner(); @@ -223,7 +254,9 @@ public List getSplits(int numBackends) throws UserException { boolean applyCountPushdown = getPushDownAggNoGroupingOp() == TPushAggOp.COUNT; // Just for counting the number of selected partitions for this paimon table Set selectedPartitionValues = Sets.newHashSet(); - long realFileSplitSize = getRealFileSplitSize(0); + // if applyCountPushdown is true, we cannot to split the file + // because the raw file and deletion vector is one-to-one mapping + long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0); for (org.apache.paimon.table.source.Split split : paimonSplits) { SplitStat splitStat = new SplitStat(); splitStat.setRowCount(split.rowCount()); @@ -241,38 +274,32 @@ public List getSplits(int numBackends) throws UserException { splitStat.setType(SplitReadType.NATIVE); splitStat.setRawFileConvertable(true); List rawFiles = optRawFiles.get(); - if (optDeletionFiles.isPresent()) { - List deletionFiles = optDeletionFiles.get(); - for (int i = 0; i < rawFiles.size(); i++) { - RawFile file = rawFiles.get(i); - DeletionFile deletionFile = deletionFiles.get(i); - LocationPath locationPath = new LocationPath(file.path(), - source.getCatalog().getProperties()); - try { - List dorisSplits = FileSplitter.splitFile( - locationPath, - realFileSplitSize, - null, - file.length(), - -1, - true, - null, - PaimonSplit.PaimonSplitCreator.DEFAULT); - for (Split dorisSplit : dorisSplits) { - // the element in DeletionFiles might be null - if (deletionFile != null) { - splitStat.setHasDeletionVector(true); - ((PaimonSplit) dorisSplit).setDeletionFile(deletionFile); - } - splits.add(dorisSplit); + for (int i = 0; i < rawFiles.size(); i++) { + RawFile file = rawFiles.get(i); + LocationPath locationPath = new LocationPath(file.path(), + source.getCatalog().getProperties()); + try { + List dorisSplits = FileSplitter.splitFile( + locationPath, + realFileSplitSize, + null, + file.length(), + -1, + true, + null, + PaimonSplit.PaimonSplitCreator.DEFAULT); + for (Split dorisSplit : dorisSplits) { + // try to set deletion file + if (optDeletionFiles.isPresent() && optDeletionFiles.get().get(i) != null) { + ((PaimonSplit) dorisSplit).setDeletionFile(optDeletionFiles.get().get(i)); + splitStat.setHasDeletionVector(true); } - ++rawFileSplitNum; - } catch (IOException e) { - throw new UserException("Paimon error to split file: " + e.getMessage(), e); } + splits.addAll(dorisSplits); + ++rawFileSplitNum; + } catch (IOException e) { + throw new UserException("Paimon error to split file: " + e.getMessage(), e); } - } else { - createRawFileSplits(rawFiles, splits, applyCountPushdown ? Long.MAX_VALUE : 0); } } else { if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_JNI) { @@ -294,47 +321,46 @@ public List getSplits(int numBackends) throws UserException { // We need to set the target size for all splits so that we can calculate the proportion of each split later. splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize)); + // if applyCountPushdown is true, calcute row count for count pushdown + if (applyCountPushdown) { + // we can create a special empty split and skip the plan process + if (splits.isEmpty()) { + return splits; + } + Optional optTableLevelCount = calcuteTableLevelCount(paimonSplits); + if (optTableLevelCount.isPresent()) { + long tableLevelRowCount = optTableLevelCount.get(); + List pushDownCountSplits; + if (tableLevelRowCount > COUNT_WITH_PARALLEL_SPLITS) { + int minSplits = sessionVariable.getParallelExecInstanceNum() * numBackends; + pushDownCountSplits = splits.subList(0, Math.min(splits.size(), minSplits)); + } else { + pushDownCountSplits = Collections.singletonList(splits.get(0)); + } + setPushDownCount(tableLevelRowCount); + assignCountToSplits(pushDownCountSplits, tableLevelRowCount); + return pushDownCountSplits; + } + } + this.selectedPartitionNum = selectedPartitionValues.size(); - // TODO: get total partition number return splits; } @VisibleForTesting public List getPaimonSplitFromAPI() { int[] projected = desc.getSlots().stream().mapToInt( - slot -> source.getPaimonTable().rowType() + slot -> source.getPaimonTable().rowType() .getFieldNames() .stream() .map(String::toLowerCase) .collect(Collectors.toList()) .indexOf(slot.getColumn().getName())) - .toArray(); + .toArray(); ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder(); return readBuilder.withFilter(predicates) - .withProjection(projected) - .newScan().plan().splits(); - } - - private void createRawFileSplits(List rawFiles, List splits, long blockSize) throws UserException { - for (RawFile file : rawFiles) { - LocationPath locationPath = new LocationPath(file.path(), - source.getCatalog().getProperties()); - try { - splits.addAll( - FileSplitter.splitFile( - locationPath, - getRealFileSplitSize(blockSize), - null, - file.length(), - -1, - true, - null, - PaimonSplit.PaimonSplitCreator.DEFAULT)); - ++rawFileSplitNum; - } catch (IOException e) { - throw new UserException("Paimon error to split file: " + e.getMessage(), e); - } - } + .withProjection(projected) + .newScan().plan().splits(); } private String getFileFormat(String path) { @@ -421,4 +447,13 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { } return sb.toString(); } + + private void assignCountToSplits(List splits, long totalCount) { + int size = splits.size(); + long countPerSplit = totalCount / size; + for (int i = 0; i < size - 1; i++) { + ((PaimonSplit) splits.get(i)).setRowCount(countPerSplit); + } + ((PaimonSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java index 988f043ad0e7d7..f4d3d724089477 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java @@ -36,13 +36,13 @@ public class PaimonSplit extends FileSplit { private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap()); private Split split; private TableFormatType tableFormatType; - private Optional optDeletionFile; + private Optional optDeletionFile = Optional.empty(); + private Optional optRowCount = Optional.empty(); public PaimonSplit(Split split) { super(DUMMY_PATH, 0, 0, 0, 0, null, null); this.split = split; this.tableFormatType = TableFormatType.PAIMON; - this.optDeletionFile = Optional.empty(); if (split instanceof DataSplit) { List dataFileMetas = ((DataSplit) split).dataFiles(); @@ -57,7 +57,6 @@ private PaimonSplit(LocationPath file, long start, long length, long fileLength, String[] hosts, List partitionList) { super(file, start, length, fileLength, modificationTime, hosts, partitionList); this.tableFormatType = TableFormatType.PAIMON; - this.optDeletionFile = Optional.empty(); this.selfSplitWeight = length; } @@ -90,6 +89,14 @@ public void setDeletionFile(DeletionFile deletionFile) { this.optDeletionFile = Optional.of(deletionFile); } + public Optional getRowCount() { + return optRowCount; + } + + public void setRowCount(long rowCount) { + this.optRowCount = Optional.of(rowCount); + } + public static class PaimonSplitCreator implements SplitCreator { static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator(); @@ -103,7 +110,7 @@ public org.apache.doris.spi.Split create(LocationPath path, long modificationTime, String[] hosts, List partitionValues) { - PaimonSplit split = new PaimonSplit(path, start, length, fileLength, + PaimonSplit split = new PaimonSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues); split.setTargetSplitSize(fileSplitSize); return split; diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java index 4a2d61f2c7dc0b..bc253680eb0a73 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -33,7 +33,9 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.RawFile; +import org.apache.paimon.table.source.Split; import org.junit.Assert; import org.junit.Test; @@ -45,6 +47,131 @@ public class PaimonScanNodeTest { + @Test + public void testCalcuteTableLevelCount() { + List splits = new ArrayList<>(); + + // Create mock splits with row count and deletion files + Split split1 = new Split() { + @Override + public long rowCount() { + return 100; + } + + @Override + public Optional> deletionFiles() { + List deletionFiles = new ArrayList<>(); + deletionFiles.add(new DeletionFile("path1", 0, 10, 10L)); + deletionFiles.add(new DeletionFile("path2", 0, 20, 20L)); + return Optional.of(deletionFiles); + } + }; + + Split split2 = new Split() { + @Override + public long rowCount() { + return 200; + } + + @Override + public Optional> deletionFiles() { + List deletionFiles = new ArrayList<>(); + deletionFiles.add(new DeletionFile("path3", 0, 30, 30L)); + deletionFiles.add(new DeletionFile("path4", 0, 40, 40L)); + return Optional.of(deletionFiles); + } + }; + + splits.add(split1); + splits.add(split2); + + Optional result = PaimonScanNode.calcuteTableLevelCount(splits); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(200, result.get().longValue()); + } + + @Test + public void testCalcuteTableLevelCountWithNullDeletionFile() { + List splits = new ArrayList<>(); + + // Create mock splits with row count and null deletion files + Split split1 = new Split() { + @Override + public long rowCount() { + return 100; + } + + @Override + public Optional> deletionFiles() { + List deletionFiles = new ArrayList<>(); + deletionFiles.add(null); + deletionFiles.add(new DeletionFile("path2", 0, 20, 20L)); + return Optional.of(deletionFiles); + } + }; + + Split split2 = new Split() { + @Override + public long rowCount() { + return 200; + } + + @Override + public Optional> deletionFiles() { + return Optional.empty(); + } + }; + + splits.add(split1); + splits.add(split2); + + Optional result = PaimonScanNode.calcuteTableLevelCount(splits); + Assert.assertTrue(result.isPresent()); + Assert.assertEquals(280, result.get().longValue()); + } + + @Test + public void testCalcuteTableLevelCountWithNullCardinality() { + List splits = new ArrayList<>(); + + // Create mock splits with row count and deletion files with null cardinality + Split split1 = new Split() { + @Override + public long rowCount() { + return 100; + } + + @Override + public Optional> deletionFiles() { + List deletionFiles = new ArrayList<>(); + deletionFiles.add(new DeletionFile("path1", 0, 10, null)); + deletionFiles.add(new DeletionFile("path2", 0, 20, 20L)); + return Optional.of(deletionFiles); + } + }; + + Split split2 = new Split() { + @Override + public long rowCount() { + return 200; + } + + @Override + public Optional> deletionFiles() { + List deletionFiles = new ArrayList<>(); + deletionFiles.add(new DeletionFile("path3", 0, 30, 30L)); + deletionFiles.add(null); + return Optional.of(deletionFiles); + } + }; + + splits.add(split1); + splits.add(split2); + + Optional result = PaimonScanNode.calcuteTableLevelCount(splits); + Assert.assertFalse(result.isPresent()); + } + @Mocked private SessionVariable sv; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 611c58d2bfd6fc..7411383670fac5 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -314,6 +314,7 @@ struct TIcebergFileDesc { // Deprecated 5: optional Exprs.TExpr file_select_conjunct; 6: optional string original_file_path; + // Deprecated 7: optional i64 row_count; } @@ -338,6 +339,7 @@ struct TPaimonFileDesc { 12: optional TPaimonDeletionFileDesc deletion_file; 13: optional map hadoop_conf // deprecated 14: optional string paimon_table // deprecated + 15: optional i64 row_count // deprecated } struct TTrinoConnectorFileDesc { @@ -405,6 +407,7 @@ struct TTableFormatFileDesc { 6: optional TMaxComputeFileDesc max_compute_params 7: optional TTrinoConnectorFileDesc trino_connector_params 8: optional TLakeSoulFileDesc lakesoul_params + 9: optional i64 table_level_row_count } enum TTextSerdeType { diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out index a394836625d751..f3b44964915230 100644 --- a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out +++ b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out @@ -578,26 +578,6 @@ bbb -- !c109 -- --- !c110 -- -3 - --- !c111 -- -3 - --- !c112 -- -2 - --- !c113 -- -2 - --- !c114 -- -3 3_1 -4 4_1 - --- !c115 -- -3 3_1 -4 4_1 - -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -1177,26 +1157,6 @@ bbb -- !c109 -- --- !c110 -- -3 - --- !c111 -- -3 - --- !c112 -- -2 - --- !c113 -- -2 - --- !c114 -- -3 3_1 -4 4_1 - --- !c115 -- -3 3_1 -4 4_1 - -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -1776,26 +1736,6 @@ bbb -- !c109 -- --- !c110 -- -3 - --- !c111 -- -3 - --- !c112 -- -2 - --- !c113 -- -2 - --- !c114 -- -3 3_1 -4 4_1 - --- !c115 -- -3 3_1 -4 4_1 - -- !all -- 1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 @@ -2375,23 +2315,3 @@ bbb -- !c109 -- --- !c110 -- -3 - --- !c111 -- -3 - --- !c112 -- -2 - --- !c113 -- -2 - --- !c114 -- -3 3_1 -4 4_1 - --- !c115 -- -3 3_1 -4 4_1 - diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out new file mode 100644 index 00000000000000..f0b1e92a088538 --- /dev/null +++ b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out @@ -0,0 +1,73 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !1 -- +3 + +-- !2 -- +3 + +-- !3 -- +2 + +-- !4 -- +2 + +-- !5 -- +3 3_1 +4 4_1 + +-- !6 -- +3 3_1 +4 4_1 + +-- !7 -- +1 Paimon 5 +4 Venti 25 +5 Kaeya 28 +6 Jean 26 +7 Diluc 30 +8 Razor 18 +9 Mona 19 +10 Barbara 16 + +-- !8 -- +8 + +-- !9 -- +7 + +-- !1 -- +3 + +-- !2 -- +3 + +-- !3 -- +2 + +-- !4 -- +2 + +-- !5 -- +3 3_1 +4 4_1 + +-- !6 -- +3 3_1 +4 4_1 + +-- !7 -- +1 Paimon 5 +4 Venti 25 +5 Kaeya 28 +6 Jean 26 +7 Diluc 30 +8 Razor 18 +9 Mona 19 +10 Barbara 16 + +-- !8 -- +8 + +-- !9 -- +7 + diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy index 9668cbb0950c5d..41afb02e0f932d 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy @@ -181,13 +181,6 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ def c108= """ select id from tb_with_upper_case where id = 1 """ def c109= """ select id from tb_with_upper_case where id < 1 """ - def c110 = """select count(*) from deletion_vector_orc;""" - def c111 = """select count(*) from deletion_vector_parquet;""" - def c112 = """select count(*) from deletion_vector_orc where id > 2;""" - def c113 = """select count(*) from deletion_vector_parquet where id > 2;""" - def c114 = """select * from deletion_vector_orc where id > 2;""" - def c115 = """select * from deletion_vector_parquet where id > 2;""" - String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") String catalog_name = "ctl_test_paimon_catalog" String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") @@ -296,13 +289,6 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ qt_c107 c107 qt_c108 c108 qt_c109 c109 - - qt_c110 c110 - qt_c111 c111 - qt_c112 c112 - qt_c113 c113 - qt_c114 c114 - qt_c115 c115 } test_cases("false", "false") diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy new file mode 100644 index 00000000000000..fade251ed56f4a --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_deletion_vector", "p0,external,doris,external_docker,external_docker_doris") { + + logger.info("start paimon test") + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disabled paimon test") + return + } + + try { + String catalog_name = "test_paimon_deletion_vector" + String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type" = "paimon", + "paimon.catalog.type"="filesystem", + "warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1" + );""" + sql """use `${catalog_name}`.`db1`""" + + def test_cases = { String force -> + sql """ set force_jni_scanner=${force} """ + qt_1 """select count(*) from deletion_vector_orc;""" + qt_2 """select count(*) from deletion_vector_parquet;""" + qt_3 """select count(*) from deletion_vector_orc where id > 2;""" + qt_4 """select count(*) from deletion_vector_parquet where id > 2;""" + qt_5 """select * from deletion_vector_orc where id > 2 order by id;""" + qt_6 """select * from deletion_vector_parquet where id > 2 order by id;""" + qt_7 """select * from deletion_vector_table_1_0 order by id;""" + qt_8 """select count(*) from deletion_vector_table_1_0;""" + qt_9 """select count(*) from deletion_vector_table_1_0 where id > 2;""" + } + + def test_table_count_push_down = { String force -> + sql """ set force_jni_scanner=${force} """ + explain { + sql("select count(*) from deletion_vector_orc;") + contains "pushdown agg=COUNT (-1)" + } + explain { + sql("select count(*) from deletion_vector_parquet;") + contains "pushdown agg=COUNT (-1)" + } + explain { + sql("select count(*) from deletion_vector_table_1_0;") + contains "pushdown agg=COUNT (8)" + } + } + + def test_not_table_count_push_down = { String force -> + sql """ set enable_count_push_down_for_external_table=false; """ + sql """ set force_jni_scanner=${force} """ + explain { + sql("select count(*) from deletion_vector_orc;") + contains "pushdown agg=NONE" + } + explain { + sql("select count(*) from deletion_vector_parquet;") + contains "pushdown agg=NONE" + } + explain { + sql("select count(*) from deletion_vector_table_1_0;") + contains "pushdown agg=NONE" + } + } + + test_cases("false") + test_cases("true") + test_table_count_push_down("false") + test_table_count_push_down("true") + test_not_table_count_push_down("false") + test_not_table_count_push_down("true") + } finally { + sql """ set enable_count_push_down_for_external_table=true; """ + sql """set force_jni_scanner=false""" + } + +} \ No newline at end of file