From e295bd98f2e7ab49377c1fb10320b767364e09fb Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Sat, 12 Aug 2023 13:26:47 +0800 Subject: [PATCH 1/6] optimize count --- be/src/vec/exec/format/generic_reader.h | 7 +++ .../vec/exec/format/parquet/vparquet_reader.h | 2 + .../vec/exec/format/table/iceberg_reader.cpp | 25 +++++++++ be/src/vec/exec/format/table/iceberg_reader.h | 3 + be/src/vec/exec/scan/vfile_scanner.cpp | 1 + be/src/vec/exec/scan/vscan_node.cpp | 5 ++ be/src/vec/exec/scan/vscan_node.h | 6 ++ .../org/apache/doris/planner/PlanNode.java | 4 ++ .../external/iceberg/IcebergScanNode.java | 55 ++++++++++++++++++- .../external/iceberg/IcebergSplit.java | 4 +- gensrc/thrift/PlanNodes.thrift | 2 + 11 files changed, 112 insertions(+), 2 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index 7b6f3c7b9c733a..fbb90e5ba7d705 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -36,6 +36,11 @@ class GenericReader { void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) { _push_down_agg_type = push_down_agg_type; } + + void set_push_down_count(int64_t count) { + _push_down_count = count; + } + virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; virtual std::unordered_map get_name_to_type() { @@ -75,6 +80,8 @@ class GenericReader { /// Whether the underlying FileReader has filled the partition&missing columns bool _fill_all_columns = false; TPushAggOp::type _push_down_agg_type; + // Record the value of the aggregate function 'count' from be + int64_t _push_down_count = -1; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 0f3996db40ed7a..cdc9366c192418 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -149,6 +149,8 @@ class ParquetReader : public GenericReader { _table_col_to_file_col = map; } + size_t get_batch_size() { return _batch_size; } + private: struct ParquetProfile { RuntimeProfile::Counter* filtered_row_groups; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 6d2f57258680cc..f37619b541cd03 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -132,10 +132,31 @@ Status IcebergTableReader::init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); + + _batch_size = parquet_reader->get_batch_size(); + _remaining_push_down_count = _push_down_count; + return status; } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + + // already get rows from be + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + + auto rows = std::min(_remaining_push_down_count, (int64_t)_batch_size); + _remaining_push_down_count -= rows; + for (auto& col : block->mutate_columns()) { + col->resize(rows); + } + *read_rows = rows; + if (_remaining_push_down_count == 0) { + *eof = true; + } + + return Status::OK(); + } + // To support iceberg schema evolution. We change the column name in block to // make it match with the column name in parquet file before reading data. and // Set the name back to table column name before return this block. @@ -149,6 +170,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* } block->initialize_index_by_name(); } + auto res = _file_format_reader->get_next_block(block, read_rows, eof); // Set the name back to table column name before return this block. if (_has_schema_change) { @@ -191,6 +213,9 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { const std::vector& files = table_desc.delete_files; if (files.empty()) { return Status::OK(); + } else { + // TODO: wuewnchi 在上面直接做返回 + _push_down_agg_type = TPushAggOp::type::NONE; } if (delete_file_type == POSITION_DELETE) { RETURN_IF_ERROR(_position_delete(files)); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 451c51445ed9fc..22b69ae304ed84 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -154,6 +154,9 @@ class IcebergTableReader : public TableFormatReader { io::IOContext* _io_ctx; bool _has_schema_change = false; bool _has_iceberg_schema = false; + + size_t _batch_size; + int64_t _remaining_push_down_count; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 6365f60ed6331c..01b081ca1e645c 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -803,6 +803,7 @@ Status VFileScanner::_get_next_reader() { _missing_cols.clear(); _cur_reader->get_columns(&_name_to_col_type, &_missing_cols); _cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type()); + _cur_reader->set_push_down_count(_parent->get_push_down_count()); RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index c45524667cabcb..e65bdab09302e5 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -126,6 +126,11 @@ Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { } else { _push_down_agg_type = TPushAggOp::type::NONE; } + + if (tnode.__isset.push_down_count) { + _push_down_count = tnode.push_down_count; + } + return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 6b1922b0b8c0ee..c023bbabe2a176 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -136,6 +136,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { } TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; } + + int64_t get_push_down_count() { return _push_down_count; } + // Get next block. // If eos is true, no more data will be read and block should be empty. Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) override; @@ -349,6 +352,9 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { TPushAggOp::type _push_down_agg_type; + // Record the value of the aggregate function 'count' from doris's be + int64_t _push_down_count = -1; + private: Status _normalize_conjuncts(); Status _normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 579b80586be272..8dcc8043b447d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1191,6 +1191,10 @@ public void setPushDownAggNoGrouping(TPushAggOp pushDownAggNoGroupingOp) { this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp; } + public TPushAggOp getPushDownAggNoGroupingOp() { + return pushDownAggNoGroupingOp; + } + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 5fa64fa2b6a7e5..61ea218b6fb7cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -18,8 +18,10 @@ package org.apache.doris.planner.external.iceberg; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.external.ExternalTable; @@ -43,6 +45,8 @@ import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TTableFormatFileDesc; import avro.shaded.com.google.common.base.Preconditions; @@ -55,6 +59,7 @@ import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; @@ -68,7 +73,9 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -79,6 +86,9 @@ public class IcebergScanNode extends FileQueryScanNode { public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2; public static final String DEFAULT_DATA_PATH = "/data/"; + private static final String TOTAL_RECORDS = "total-records"; + private static final String TOTAL_POSITION_DELETES = "total-position-deletes"; + private static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes"; private IcebergSource source; private Table icebergTable; @@ -210,8 +220,8 @@ public List getSplits() throws UserException { splitTask.length(), splitTask.file().fileSizeInBytes(), new String[0], + formatVersion, source.getCatalog().getProperties()); - split.setFormatVersion(formatVersion); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) { split.setDeleteFileFilters(getDeleteFileFilters(splitTask)); } @@ -222,6 +232,12 @@ public List getSplits() throws UserException { throw new UserException(e.getMessage(), e.getCause()); } + + TPushAggOp aggOp = getPushDownAggNoGroupingOp(); + if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) { + return Collections.singletonList(splits.get(0)); + } + readPartitionNum = partitionPathSet.size(); return splits; @@ -334,4 +350,41 @@ public TableIf getTargetTable() { public Map getLocationProperties() throws UserException { return source.getCatalog().getProperties(); } + + @Override + public boolean pushDownAggNoGrouping(FunctionCallExpr aggExpr) { + String aggFunctionName = aggExpr.getFnName().getFunction().toUpperCase(); + return "COUNT".equals(aggFunctionName); + } + + @Override + public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column col) { + return !col.isAllowNull(); + } + + private long getCountFromSnapshot() { + Snapshot snapshot = icebergTable.currentSnapshot(); + // empty table + if (snapshot == null) { + return -1; + } + + Map summary = snapshot.summary(); + if (summary.get(TOTAL_EQUALITY_DELETES).equals("0")) { + return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); + } else { + return -1; + } + } + + @Override + protected void toThrift(TPlanNode planNode) { + super.toThrift(planNode); + if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) { + long countFromSnapshot = getCountFromSnapshot(); + if (countFromSnapshot > 0) { + planNode.setPushDownCount(countFromSnapshot); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java index de3f2ec6aad514..29deb293b3db75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java @@ -27,10 +27,12 @@ @Data public class IcebergSplit extends FileSplit { + // File path will be changed if the file is modified, so there's no need to get modification time. public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts, - Map config) { + Integer formatVersion, Map config) { super(file, start, length, fileLength, hosts, null); + this.formatVersion = formatVersion; this.config = config; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index a36368ef7af665..105b86f8b49d3e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1145,6 +1145,8 @@ struct TPlanNode { 47: optional TTestExternalScanNode test_external_scan_node 48: optional TPushAggOp push_down_agg_type_opt + + 49: optional i64 push_down_count 101: optional list projections 102: optional Types.TTupleId output_tuple_id From 9dd7572d4503fc7a9dc247733a2eaccf32ddb146 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Sat, 12 Aug 2023 15:35:35 +0800 Subject: [PATCH 2/6] optimize count --- be/src/vec/exec/format/generic_reader.h | 6 ------ be/src/vec/exec/format/table/iceberg_reader.cpp | 16 +++++++++------- be/src/vec/exec/format/table/iceberg_reader.h | 3 ++- be/src/vec/exec/scan/vfile_scanner.cpp | 4 ++-- 4 files changed, 13 insertions(+), 16 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index fbb90e5ba7d705..7842f2edb92059 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -37,10 +37,6 @@ class GenericReader { _push_down_agg_type = push_down_agg_type; } - void set_push_down_count(int64_t count) { - _push_down_count = count; - } - virtual Status get_next_block(Block* block, size_t* read_rows, bool* eof) = 0; virtual std::unordered_map get_name_to_type() { @@ -80,8 +76,6 @@ class GenericReader { /// Whether the underlying FileReader has filled the partition&missing columns bool _fill_all_columns = false; TPushAggOp::type _push_down_agg_type; - // Record the value of the aggregate function 'count' from be - int64_t _push_down_count = -1; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index f37619b541cd03..ca96d69b6a2fcf 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -90,14 +90,16 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx) + io::IOContext* io_ctx, + int64_t push_down_count) : TableFormatReader(std::move(file_format_reader)), _profile(profile), _state(state), _params(params), _range(range), _kv_cache(kv_cache), - _io_ctx(io_ctx) { + _io_ctx(io_ctx), + _remaining_push_down_count(push_down_count) { static const char* iceberg_profile = "IcebergProfile"; ADD_TIMER(_profile, iceberg_profile); _iceberg_profile.num_delete_files = @@ -132,9 +134,7 @@ Status IcebergTableReader::init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); - _batch_size = parquet_reader->get_batch_size(); - _remaining_push_down_count = _push_down_count; return status; } @@ -204,6 +204,10 @@ Status IcebergTableReader::get_columns( } Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { + if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { + return Status::OK(); + } + auto& table_desc = range.table_format_params.iceberg_params; auto& version = table_desc.format_version; if (version < MIN_SUPPORT_DELETE_FILES_VERSION) { @@ -213,10 +217,8 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { const std::vector& files = table_desc.delete_files; if (files.empty()) { return Status::OK(); - } else { - // TODO: wuewnchi 在上面直接做返回 - _push_down_agg_type = TPushAggOp::type::NONE; } + if (delete_file_type == POSITION_DELETE) { RETURN_IF_ERROR(_position_delete(files)); } diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 22b69ae304ed84..45f2b0ec4f9498 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -70,7 +70,8 @@ class IcebergTableReader : public TableFormatReader { IcebergTableReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx); + io::IOContext* io_ctx, + int64_t push_down_count); ~IcebergTableReader() override = default; Status init_row_filters(const TFileRangeDesc& range) override; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 01b081ca1e645c..35b6db01cd8fee 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -701,7 +701,8 @@ Status VFileScanner::_get_next_reader() { std::unique_ptr iceberg_reader = IcebergTableReader::create_unique(std::move(parquet_reader), _profile, _state, *_params, range, _kv_cache, - _io_ctx.get()); + _io_ctx.get(), + _parent->get_push_down_count()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(), @@ -803,7 +804,6 @@ Status VFileScanner::_get_next_reader() { _missing_cols.clear(); _cur_reader->get_columns(&_name_to_col_type, &_missing_cols); _cur_reader->set_push_down_agg_type(_parent->get_push_down_agg_type()); - _cur_reader->set_push_down_count(_parent->get_push_down_count()); RETURN_IF_ERROR(_generate_fill_columns()); if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) { fmt::memory_buffer col_buf; From a2b272d79e821bda9ce3ec41542bef053b20ac08 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 14 Aug 2023 10:10:23 +0800 Subject: [PATCH 3/6] optimize count --- be/src/vec/exec/format/table/iceberg_reader.cpp | 6 ++++++ .../doris/planner/external/iceberg/IcebergScanNode.java | 5 ++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index ca96d69b6a2fcf..7789eeb0af72b1 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -204,6 +204,8 @@ Status IcebergTableReader::get_columns( } Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { + + // We get the count value by doris's be, so we don't need to read the delete file if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { return Status::OK(); } @@ -222,7 +224,11 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { if (delete_file_type == POSITION_DELETE) { RETURN_IF_ERROR(_position_delete(files)); } + // todo: equality delete + // If it is a count operation and it has equality delete file kind, + // the push down operation of the count for this split needs to be canceled. + COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size()); return Status::OK(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index 61ea218b6fb7cd..a6d3435b5cedcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -75,7 +75,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -232,12 +231,12 @@ public List getSplits() throws UserException { throw new UserException(e.getMessage(), e.getCause()); } - TPushAggOp aggOp = getPushDownAggNoGroupingOp(); if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) { + // we can create a special empty split and skip the plan process return Collections.singletonList(splits.get(0)); } - + readPartitionNum = partitionPathSet.size(); return splits; From 020c7c12a9e9fd2aa7753ab96b14732949626c91 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 14 Aug 2023 14:19:38 +0800 Subject: [PATCH 4/6] optimize count --- be/src/vec/exec/format/parquet/vparquet_reader.h | 2 -- be/src/vec/exec/format/table/iceberg_reader.cpp | 3 +-- be/src/vec/exec/format/table/iceberg_reader.h | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index cdc9366c192418..0f3996db40ed7a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -149,8 +149,6 @@ class ParquetReader : public GenericReader { _table_col_to_file_col = map; } - size_t get_batch_size() { return _batch_size; } - private: struct ParquetProfile { RuntimeProfile::Counter* filtered_row_groups; diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 7789eeb0af72b1..d225921b447d57 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -134,7 +134,6 @@ Status IcebergTableReader::init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts); - _batch_size = parquet_reader->get_batch_size(); return status; } @@ -144,7 +143,7 @@ Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* // already get rows from be if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { - auto rows = std::min(_remaining_push_down_count, (int64_t)_batch_size); + auto rows = std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); _remaining_push_down_count -= rows; for (auto& col : block->mutate_columns()) { col->resize(rows); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index 45f2b0ec4f9498..b9084babbf2ae4 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -156,7 +156,6 @@ class IcebergTableReader : public TableFormatReader { bool _has_schema_change = false; bool _has_iceberg_schema = false; - size_t _batch_size; int64_t _remaining_push_down_count; }; } // namespace vectorized From 7b4ee880c2134e1e5b6e6ba7dafd647346f6c115 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 14 Aug 2023 17:56:07 +0800 Subject: [PATCH 5/6] fix snapshot from user --- .../planner/external/iceberg/IcebergScanNode.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index a6d3435b5cedcc..7293982ebbc3c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -362,7 +362,16 @@ public boolean pushDownAggNoGroupingCheckCol(FunctionCallExpr aggExpr, Column co } private long getCountFromSnapshot() { - Snapshot snapshot = icebergTable.currentSnapshot(); + Long specifiedSnapshot; + try { + specifiedSnapshot = getSpecifiedSnapshot(); + } catch (UserException e) { + return -1; + } + + Snapshot snapshot = specifiedSnapshot == null + ? icebergTable.currentSnapshot() : icebergTable.snapshot(specifiedSnapshot); + // empty table if (snapshot == null) { return -1; From ebdb4097c4f5a54ffc8510873ed4eb56dadb5247 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 15 Aug 2023 12:57:26 +0800 Subject: [PATCH 6/6] format code --- be/src/vec/exec/format/table/iceberg_reader.cpp | 9 +++------ be/src/vec/exec/format/table/iceberg_reader.h | 3 +-- be/src/vec/exec/scan/vfile_scanner.cpp | 7 +++---- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index d225921b447d57..1b0f05cc950fee 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -90,8 +90,7 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr file_forma RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx, - int64_t push_down_count) + io::IOContext* io_ctx, int64_t push_down_count) : TableFormatReader(std::move(file_format_reader)), _profile(profile), _state(state), @@ -139,11 +138,10 @@ Status IcebergTableReader::init_reader( } Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - // already get rows from be if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { - - auto rows = std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); + auto rows = + std::min(_remaining_push_down_count, (int64_t)_state->query_options().batch_size); _remaining_push_down_count -= rows; for (auto& col : block->mutate_columns()) { col->resize(rows); @@ -203,7 +201,6 @@ Status IcebergTableReader::get_columns( } Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) { - // We get the count value by doris's be, so we don't need to read the delete file if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) { return Status::OK(); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index b9084babbf2ae4..8def49e68cfba7 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -69,8 +69,7 @@ class IcebergTableReader : public TableFormatReader { IcebergTableReader(std::unique_ptr file_format_reader, RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, - const TFileRangeDesc& range, ShardedKVCache* kv_cache, - io::IOContext* io_ctx, + const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx, int64_t push_down_count); ~IcebergTableReader() override = default; diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 35b6db01cd8fee..bc439b73d256e2 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -699,10 +699,9 @@ Status VFileScanner::_get_next_reader() { if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { std::unique_ptr iceberg_reader = - IcebergTableReader::create_unique(std::move(parquet_reader), _profile, - _state, *_params, range, _kv_cache, - _io_ctx.get(), - _parent->get_push_down_count()); + IcebergTableReader::create_unique( + std::move(parquet_reader), _profile, _state, *_params, range, + _kv_cache, _io_ctx.get(), _parent->get_push_down_count()); init_status = iceberg_reader->init_reader( _file_col_names, _col_id_name_map, _colname_to_value_range, _push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),