diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 4ceab8eb6e39b5..d4a86743a488c7 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -163,16 +163,6 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() { return Status::Error("no suitable versions for compaction"); } - int score = 0; - int rowset_cnt = 0; - while (rowset_cnt < _input_rowsets.size()) { - score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score(); - if (score > config::base_compaction_max_compaction_score) { - break; - } - } - _input_rowsets.resize(rowset_cnt); - // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 2a26b1b294b58e..de318f979a5909 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -354,20 +354,11 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() { return st; } - int64_t max_score = config::cumulative_compaction_max_deltas; - auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); - bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8; - if (cloud_tablet()->last_compaction_status.is() || - memory_usage_high) { - max_score = std::max(config::cumulative_compaction_max_deltas / - config::cumulative_compaction_max_deltas_factor, - config::cumulative_compaction_min_deltas + 1); - } - size_t compaction_score = 0; auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy(); _engine.cumu_compaction_policy(compaction_policy) - ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score, + ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, + config::cumulative_compaction_max_deltas, config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, &compaction_score); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 91707b5a8a262e..0f110fc694c831 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -385,7 +385,6 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1"); DEFINE_Bool(enable_base_compaction_idle_sched, "true"); DEFINE_mInt64(base_compaction_min_rowset_num, "5"); -DEFINE_mInt64(base_compaction_max_compaction_score, "20"); DEFINE_mDouble(base_compaction_min_data_ratio, "0.3"); DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024"); @@ -416,7 +415,6 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64"); // cumulative compaction policy: min and max delta file's number DEFINE_mInt64(cumulative_compaction_min_deltas, "5"); DEFINE_mInt64(cumulative_compaction_max_deltas, "1000"); -DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10"); // This config can be set to limit thread number in multiget thread pool. DEFINE_mInt32(multi_get_max_threads, "10"); @@ -1317,10 +1315,6 @@ DEFINE_Bool(enable_file_logger, "true"); // The minimum row group size when exporting Parquet files. default 128MB DEFINE_Int64(min_row_group_size, "134217728"); -DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824"); - -DEFINE_mInt64(compaction_batch_size, "-1"); - // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index f35aeb61747a86..ddaa60a2e0580c 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -438,7 +438,6 @@ DECLARE_mInt32(max_single_replica_compaction_threads); DECLARE_Bool(enable_base_compaction_idle_sched); DECLARE_mInt64(base_compaction_min_rowset_num); -DECLARE_mInt64(base_compaction_max_compaction_score); DECLARE_mDouble(base_compaction_min_data_ratio); DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes); @@ -469,7 +468,6 @@ DECLARE_mInt64(compaction_min_size_mbytes); // cumulative compaction policy: min and max delta file's number DECLARE_mInt64(cumulative_compaction_min_deltas); DECLARE_mInt64(cumulative_compaction_max_deltas); -DECLARE_mInt32(cumulative_compaction_max_deltas_factor); // This config can be set to limit thread number in multiget thread pool. DECLARE_mInt32(multi_get_max_threads); @@ -1403,10 +1401,6 @@ DECLARE_Bool(enable_file_logger); // The minimum row group size when exporting Parquet files. DECLARE_Int64(min_row_group_size); -DECLARE_mInt64(compaction_memory_bytes_limit); - -DECLARE_mInt64(compaction_batch_size); - #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 8be29383c1e9b1..436180c78ca87d 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -151,16 +151,6 @@ Status BaseCompaction::pick_rowsets_to_compact() { "situation, no need to do base compaction."); } - int score = 0; - int rowset_cnt = 0; - while (rowset_cnt < _input_rowsets.size()) { - score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score(); - if (score > config::base_compaction_max_compaction_score) { - break; - } - } - _input_rowsets.resize(rowset_cnt); - // 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 4852a6cba9b7fd..dc5f488e04492c 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -22,7 +22,6 @@ #include #include "common/status.h" -#include "olap/iterators.h" #include "olap/partial_update_info.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/tablet_fwd.h" @@ -300,10 +299,6 @@ class BaseTablet { std::atomic read_block_count = 0; std::atomic write_count = 0; std::atomic compaction_count = 0; - - std::mutex sample_info_lock; - std::vector sample_infos; - Status last_compaction_status = Status::OK(); }; } /* namespace doris */ diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index b42c23f18742bc..37dcac5283ee98 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -149,15 +149,6 @@ void Compaction::init_profile(const std::string& label) { _merge_rowsets_latency_timer = ADD_TIMER(_profile, "merge_rowsets_latency"); } -int64_t Compaction::merge_way_num() { - int64_t way_num = 0; - for (auto&& rowset : _input_rowsets) { - way_num += rowset->rowset_meta()->get_merge_way_num(); - } - - return way_num; -} - Status Compaction::merge_input_rowsets() { std::vector input_rs_readers; input_rs_readers.reserve(_input_rowsets.size()); @@ -179,23 +170,19 @@ Status Compaction::merge_input_rowsets() { _stats.rowid_conversion = &_rowid_conversion; } - int64_t way_num = merge_way_num(); - Status res; { SCOPED_TIMER(_merge_rowsets_latency_timer); if (_is_vertical) { res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), way_num, &_stats); + get_avg_segment_rows(), &_stats); } else { res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), &_stats); } } - _tablet->last_compaction_status = res; - if (!res.ok()) { LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res << ", tablet=" << _tablet->tablet_id() diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 8e0c1099a20942..9ec1297c69cb0a 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -81,8 +81,6 @@ class Compaction { void _load_segment_to_cache(); - int64_t merge_way_num(); - // the root tracker for this compaction std::shared_ptr _mem_tracker; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 2c7e654787a650..1e0f338da23978 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -134,20 +134,11 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", tablet=" << _tablet->tablet_id(); } - int64_t max_score = config::cumulative_compaction_max_deltas; - auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); - bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8; - if (tablet()->last_compaction_status.is() || memory_usage_high) { - max_score = std::max(config::cumulative_compaction_max_deltas / - config::cumulative_compaction_max_deltas_factor, - config::cumulative_compaction_min_deltas + 1); - } - size_t compaction_score = 0; tablet()->cumulative_compaction_policy()->pick_input_rowsets( - tablet(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas, - &_input_rowsets, &_last_delete_version, &compaction_score, - _allow_delete_in_cumu_compaction); + tablet(), candidate_rowsets, config::cumulative_compaction_max_deltas, + config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, + &compaction_score, _allow_delete_in_cumu_compaction); // Cumulative compaction will process with at least 1 rowset. // So when there is no rowset being chosen, we should return Status::Error(): diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index cbf8f1eca65ae2..330aa9e3475806 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -17,7 +17,6 @@ #pragma once -#include #include #include "common/status.h" @@ -122,12 +121,6 @@ class StorageReadOptions { size_t topn_limit = 0; }; -struct CompactionSampleInfo { - int64_t bytes = 0; - int64_t rows = 0; - int64_t group_data_size; -}; - class RowwiseIterator; using RowwiseIteratorUPtr = std::unique_ptr; class RowwiseIterator { @@ -140,13 +133,7 @@ class RowwiseIterator { // Input options may contain scan range in which this scan. // Return Status::OK() if init successfully, // Return other error otherwise - virtual Status init(const StorageReadOptions& opts) { - return Status::NotSupported("to be implemented"); - } - - virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) { - return Status::NotSupported("to be implemented"); - } + virtual Status init(const StorageReadOptions& opts) = 0; // If there is any valid data, this function will load data // into input batch with Status::OK() returned diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 4c620d30252950..cecbeb163dd673 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include @@ -34,9 +33,7 @@ #include "common/config.h" #include "common/logging.h" -#include "common/status.h" #include "olap/base_tablet.h" -#include "olap/iterators.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowid_conversion.h" @@ -46,7 +43,6 @@ #include "olap/rowset/segment_v2/segment_writer.h" #include "olap/storage_engine.h" #include "olap/tablet.h" -#include "olap/tablet_fwd.h" #include "olap/tablet_reader.h" #include "olap/utils.h" #include "util/slice.h" @@ -245,8 +241,7 @@ Status Merger::vertical_compact_one_group( vectorized::RowSourcesBuffer* row_source_buf, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output, - std::vector key_group_cluster_key_idxes, int64_t batch_size, - CompactionSampleInfo* sample_info) { + std::vector key_group_cluster_key_idxes) { // build tablet reader VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment; vectorized::VerticalBlockReader reader(row_source_buf); @@ -284,8 +279,7 @@ Status Merger::vertical_compact_one_group( reader_params.return_columns = column_group; reader_params.origin_return_columns = &reader_params.return_columns; - reader_params.batch_size = batch_size; - RETURN_IF_ERROR(reader.init(reader_params, sample_info)); + RETURN_IF_ERROR(reader.init(reader_params)); if (reader_params.record_rowids) { stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); @@ -391,55 +385,6 @@ Status Merger::vertical_compact_one_group(int64_t tablet_id, ReaderType reader_t return Status::OK(); } -int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) { - std::unique_lock lock(tablet->sample_info_lock); - CompactionSampleInfo info = tablet->sample_infos[group_index]; - if (way_cnt <= 0) { - LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " - << tablet->tablet_id() << " way cnt: " << way_cnt; - return 4096 - 32; - } - int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt; - if (tablet->last_compaction_status.is()) { - block_mem_limit /= 4; - } - - int64_t group_data_size = 0; - if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) { - float smoothing_factor = 0.5; - group_data_size = int64_t(info.group_data_size * (1 - smoothing_factor) + - info.bytes / info.rows * smoothing_factor); - tablet->sample_infos[group_index].group_data_size = group_data_size; - } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) { - group_data_size = info.group_data_size; - } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) { - group_data_size = info.bytes / info.rows; - tablet->sample_infos[group_index].group_data_size = group_data_size; - } else { - LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " - << tablet->tablet_id() << " group data size: " << info.group_data_size - << " row num: " << info.rows << " consume bytes: " << info.bytes; - return 1024 - 32; - } - - if (group_data_size <= 0) { - LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: " - << tablet->tablet_id() << " unexpected group data size: " << group_data_size; - return 4096 - 32; - } - - tablet->sample_infos[group_index].bytes = 0; - tablet->sample_infos[group_index].rows = 0; - - int64_t batch_size = block_mem_limit / group_data_size; - int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L); - LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id() - << " group data size: " << info.group_data_size << " row num: " << info.rows - << " consume bytes: " << info.bytes << " way cnt: " << way_cnt - << " batch size: " << res; - return res; -} - // steps to do vertical merge: // 1. split columns into column groups // 2. compact groups one by one, generate a row_source_buf when compact key group @@ -449,7 +394,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t const TabletSchema& tablet_schema, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, - int64_t merge_way_num, Statistics* stats_output) { + Statistics* stats_output) { LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id(); std::vector> column_groups; vertical_split_columns(tablet_schema, &column_groups); @@ -460,18 +405,14 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t vectorized::RowSourcesBuffer row_sources_buf( tablet->tablet_id(), dst_rowset_writer->context().tablet_path, reader_type); - tablet->sample_infos.resize(column_groups.size(), {0, 0, 0}); // compact group one by one for (auto i = 0; i < column_groups.size(); ++i) { VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); bool is_key = (i == 0); - int64_t batch_size = config::compaction_batch_size != -1 - ? config::compaction_batch_size - : estimate_batch_size(i, tablet, merge_way_num); RETURN_IF_ERROR(vertical_compact_one_group( tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf, src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output, - key_group_cluster_key_idxes, batch_size, &(tablet->sample_infos[i]))); + key_group_cluster_key_idxes)); if (is_key) { RETURN_IF_ERROR(row_sources_buf.flush()); } diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index 7513c90fbd1217..5749f518136bd9 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -21,7 +21,6 @@ #include "common/status.h" #include "io/io_common.h" -#include "olap/iterators.h" #include "olap/rowset/rowset_fwd.h" #include "olap/tablet_fwd.h" @@ -60,7 +59,7 @@ class Merger { static Status vertical_merge_rowsets( BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema, const std::vector& src_rowset_readers, - RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, int64_t merge_way_num, + RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output); // for vertical compaction @@ -72,8 +71,7 @@ class Merger { vectorized::RowSourcesBuffer* row_source_buf, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output, - std::vector key_group_cluster_key_idxes, int64_t batch_size, - CompactionSampleInfo* sample_info); + std::vector key_group_cluster_key_idxes); // for segcompaction static Status vertical_compact_one_group(int64_t tablet_id, ReaderType reader_type, diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index aa20b5b1ef13ac..90b2ce48a0a5f0 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -269,21 +269,6 @@ class RowsetMeta { return score; } - uint32_t get_merge_way_num() const { - uint32_t way_num = 0; - if (!is_segments_overlapping()) { - if (num_segments() == 0) { - way_num = 0; - } else { - way_num = 1; - } - } else { - way_num = num_segments(); - CHECK(way_num > 0); - } - return way_num; - } - void get_segments_key_bounds(std::vector* segments_key_bounds) const { for (const KeyBoundsPB& key_range : _rowset_meta_pb.segments_key_bounds()) { segments_key_bounds->push_back(key_range); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 95f2a945134b4c..22a7049aa8f3b4 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -101,7 +101,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.tablet = tablet; reader_params.return_columns = return_columns; reader_params.is_key_column_group = is_key; - return (*reader)->init(reader_params, nullptr); + return (*reader)->init(reader_params); } std::unique_ptr SegcompactionWorker::_create_segcompaction_writer( diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index c257ba007f531a..a3cd3bd4a49577 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -183,8 +183,6 @@ class TabletReader { void check_validation() const; std::string to_string() const; - - int64_t batch_size = -1; }; TabletReader() = default; diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 872836c91cdf41..c4dda20f40f9a2 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -25,8 +25,6 @@ #include #include "cloud/config.h" -#include "olap/compaction.h" -#include "olap/iterators.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowset/rowset.h" @@ -110,8 +108,7 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para return Status::OK(); } -Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, - CompactionSampleInfo* sample_info) { +Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) { std::vector iterator_init_flag; std::vector rowset_ids; std::vector* segment_iters_ptr = read_params.segment_iters_ptr; @@ -160,8 +157,7 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, // init collect iterator StorageReadOptions opts; opts.record_rowids = read_params.record_rowids; - opts.block_row_max = read_params.batch_size; - RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info)); + RETURN_IF_ERROR(_vcollect_iter->init(opts)); // In agg keys value columns compact, get first row for _init_agg_state if (!read_params.is_key_column_group && read_params.tablet->keys_type() == KeysType::AGG_KEYS) { @@ -208,17 +204,13 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) { } Status VerticalBlockReader::init(const ReaderParams& read_params) { - return init(read_params, nullptr); -} - -Status VerticalBlockReader::init(const ReaderParams& read_params, - CompactionSampleInfo* sample_info) { StorageReadOptions opts; - _reader_context.batch_size = read_params.batch_size; + _reader_context.batch_size = opts.block_row_max; RETURN_IF_ERROR(TabletReader::init(read_params)); _arena = std::make_unique(); - auto status = _init_collect_iter(read_params, sample_info); + + auto status = _init_collect_iter(read_params); if (!status.ok()) [[unlikely]] { if (!config::is_cloud_mode()) { static_cast(_tablet.get())->report_error(status); diff --git a/be/src/vec/olap/vertical_block_reader.h b/be/src/vec/olap/vertical_block_reader.h index e1e8cfa1239b72..81ef8d7910077d 100644 --- a/be/src/vec/olap/vertical_block_reader.h +++ b/be/src/vec/olap/vertical_block_reader.h @@ -56,7 +56,6 @@ class VerticalBlockReader final : public TabletReader { // Initialize VerticalBlockReader with tablet, data version and fetch range. Status init(const ReaderParams& read_params) override; - Status init(const ReaderParams& read_params, CompactionSampleInfo* sample_info); Status next_block_with_aggregation(Block* block, bool* eof) override; @@ -80,7 +79,7 @@ class VerticalBlockReader final : public TabletReader { // to minimize the comparison time in merge heap. Status _unique_key_next_block(Block* block, bool* eof); - Status _init_collect_iter(const ReaderParams& read_params, CompactionSampleInfo* sample_info); + Status _init_collect_iter(const ReaderParams& read_params); Status _get_segment_iterators(const ReaderParams& read_params, std::vector* segment_iters, diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index 81cfc756d63562..3323492ee9015c 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include "cloud/config.h" @@ -30,7 +29,6 @@ #include "common/logging.h" #include "io/cache/block_file_cache_factory.h" #include "olap/field.h" -#include "olap/iterators.h" #include "olap/olap_common.h" #include "vec/columns/column.h" #include "vec/common/string_ref.h" @@ -342,18 +340,13 @@ Status VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) { return Status::OK(); } -Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts, - CompactionSampleInfo* sample_info) { +Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) { if (LIKELY(_inited)) { return Status::OK(); } _block_row_max = opts.block_row_max; _record_rowids = opts.record_rowids; RETURN_IF_ERROR(_load_next_block()); - if (sample_info != nullptr) { - sample_info->bytes += bytes(); - sample_info->rows += rows(); - } if (valid()) { RETURN_IF_ERROR(advance()); } @@ -512,8 +505,7 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) { return Status::EndOfFile("no more data in segment"); } -Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts, - CompactionSampleInfo* sample_info) { +Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { DCHECK(_origin_iters.size() == _iterator_init_flags.size()); _record_rowids = opts.record_rowids; if (_origin_iters.empty()) { @@ -541,7 +533,7 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts, for (size_t i = 0; i < num_iters; ++i) { if (_iterator_init_flags[i] || pre_iter_invalid) { auto& ctx = _ori_iter_ctx[i]; - RETURN_IF_ERROR(ctx->init(opts, sample_info)); + RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { pre_iter_invalid = true; continue; @@ -614,8 +606,7 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) { return Status::EndOfFile("no more data in segment"); } -Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts, - CompactionSampleInfo* sample_info) { +Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) { DCHECK(_origin_iters.size() == _iterator_init_flags.size()); DCHECK(_keys_type == KeysType::DUP_KEYS); _record_rowids = opts.record_rowids; @@ -635,7 +626,7 @@ Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts, std::unique_ptr ctx( new VerticalMergeIteratorContext(std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx)); - RETURN_IF_ERROR(ctx->init(opts, sample_info)); + RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { ++seg_order; continue; @@ -676,7 +667,7 @@ Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) { uint16_t order = row_source.get_source_num(); auto& ctx = _origin_iter_ctx[order]; // init ctx and this ctx must be valid - RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); + RETURN_IF_ERROR(ctx->init(_opts)); DCHECK(ctx->valid()); if (UNLIKELY(ctx->is_first_row())) { @@ -710,7 +701,7 @@ Status VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef auto row_source = _row_sources_buf->current(); uint16_t order = row_source.get_source_num(); auto& ctx = _origin_iter_ctx[order]; - RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); + RETURN_IF_ERROR(ctx->init(_opts)); DCHECK(ctx->valid()); if (!ctx->valid()) { LOG(INFO) << "VerticalMergeIteratorContext not valid"; @@ -749,7 +740,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { uint16_t order = _row_sources_buf->current().get_source_num(); DCHECK(order < _origin_iter_ctx.size()); auto& ctx = _origin_iter_ctx[order]; - RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); + RETURN_IF_ERROR(ctx->init(_opts)); DCHECK(ctx->valid()); if (!ctx->valid()) { LOG(INFO) << "VerticalMergeIteratorContext not valid"; @@ -772,8 +763,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { return st; } -Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts, - CompactionSampleInfo* sample_info) { +Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { if (_origin_iters.empty()) { return Status::OK(); } @@ -788,7 +778,6 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts, } _origin_iters.clear(); - _sample_info = sample_info; _block_row_max = opts.block_row_max; return Status::OK(); } diff --git a/be/src/vec/olap/vertical_merge_iterator.h b/be/src/vec/olap/vertical_merge_iterator.h index 3751aa92c78b15..f46a0446cf25a0 100644 --- a/be/src/vec/olap/vertical_merge_iterator.h +++ b/be/src/vec/olap/vertical_merge_iterator.h @@ -164,7 +164,7 @@ class VerticalMergeIteratorContext { ~VerticalMergeIteratorContext() = default; Status block_reset(const std::shared_ptr& block); - Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info = nullptr); + Status init(const StorageReadOptions& opts); bool compare(const VerticalMergeIteratorContext& rhs) const; Status copy_rows(Block* block, bool advanced = true); Status copy_rows(Block* block, size_t count); @@ -200,22 +200,6 @@ class VerticalMergeIteratorContext { return _block_row_locations[_index_in_block]; } - size_t bytes() { - if (_block) { - return _block->bytes(); - } else { - return 0; - } - } - - size_t rows() { - if (_block) { - return _block->rows(); - } else { - return 0; - } - } - private: // Load next block into _block Status _load_next_block(); @@ -271,7 +255,7 @@ class VerticalHeapMergeIterator : public RowwiseIterator { VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete; VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = delete; - Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; + Status init(const StorageReadOptions& opts) override; Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } uint64_t merged_rows() const override { return _merged_rows; } @@ -337,7 +321,7 @@ class VerticalFifoMergeIterator : public RowwiseIterator { VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete; VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = delete; - Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; + Status init(const StorageReadOptions& opts) override; Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } uint64_t merged_rows() const override { return _merged_rows; } @@ -383,7 +367,7 @@ class VerticalMaskMergeIterator : public RowwiseIterator { VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete; VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = delete; - Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; + Status init(const StorageReadOptions& opts) override; Status next_batch(Block* block) override; @@ -412,7 +396,6 @@ class VerticalMaskMergeIterator : public RowwiseIterator { size_t _filtered_rows = 0; RowSourcesBuffer* _row_sources_buf; StorageReadOptions _opts; - CompactionSampleInfo* _sample_info = nullptr; }; // segment merge iterator diff --git a/be/test/olap/base_compaction_test.cpp b/be/test/olap/base_compaction_test.cpp deleted file mode 100644 index 7d9abe54ed2163..00000000000000 --- a/be/test/olap/base_compaction_test.cpp +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "olap/base_compaction.h" - -#include -#include -#include -#include - -#include "gtest/gtest.h" -#include "gtest/gtest_pred_impl.h" -#include "olap/cumulative_compaction.h" -#include "olap/cumulative_compaction_policy.h" -#include "olap/olap_common.h" -#include "olap/rowset/rowset_factory.h" -#include "olap/rowset/rowset_meta.h" -#include "olap/storage_engine.h" -#include "olap/tablet.h" -#include "olap/tablet_meta.h" -#include "util/uid_util.h" - -namespace doris { - -class TestBaseCompaction : public testing::Test {}; - -static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, - int data_size) { - auto rs_meta = std::make_shared(); - rs_meta->set_rowset_type(BETA_ROWSET); // important - rs_meta->_rowset_meta_pb.set_start_version(version.first); - rs_meta->_rowset_meta_pb.set_end_version(version.second); - rs_meta->set_num_segments(num_segments); - rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); - rs_meta->set_total_disk_size(data_size); - RowsetSharedPtr rowset; - Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset); - if (!st.ok()) { - return nullptr; - } - return rowset; -} - -TEST_F(TestBaseCompaction, filter_input_rowset) { - StorageEngine engine({}); - TabletMetaSharedPtr tablet_meta; - tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, - UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, - TCompressionType::LZ4F)); - TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); - tablet->_cumulative_point = 25; - BaseCompaction compaction(engine, tablet); - //std::vector rowsets; - - RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0); - tablet->_rs_version_map.emplace(init_rs->version(), init_rs); - for (int i = 2; i < 30; ++i) { - RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); - tablet->_rs_version_map.emplace(rs->version(), rs); - } - Status st = compaction.pick_rowsets_to_compact(); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(compaction._input_rowsets.front()->start_version(), 0); - EXPECT_EQ(compaction._input_rowsets.front()->end_version(), 1); - - EXPECT_EQ(compaction._input_rowsets.back()->start_version(), 21); - EXPECT_EQ(compaction._input_rowsets.back()->end_version(), 21); -} - -} // namespace doris diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 5ae80398afb1b9..7c56710f2e8c81 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -348,9 +348,9 @@ class TestRowIdConversion : public testing::TestWithParambuild(out_rowset)); @@ -598,7 +598,7 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, num_segments, &stats); + output_rs_writer.get(), 100, &stats); ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -706,7 +706,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 10000, num_segments, &stats); + output_rs_writer.get(), 10000, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -815,8 +815,7 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, - &stats); + input_rs_readers, output_rs_writer.get(), 100, &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -918,8 +917,7 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, - &stats); + input_rs_readers, output_rs_writer.get(), 100, &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -1012,7 +1010,7 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, num_segments, &stats); + output_rs_writer.get(), 100, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); diff --git a/regression-test/suites/compaction/compaction_width_array_column.groovy b/regression-test/suites/compaction/compaction_width_array_column.groovy deleted file mode 100644 index 4e3fed354c7d84..00000000000000 --- a/regression-test/suites/compaction/compaction_width_array_column.groovy +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite('compaction_width_array_column', "p2") { - String backend_id; - def backendId_to_backendIP = [:] - def backendId_to_backendHttpPort = [:] - getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); - - backend_id = backendId_to_backendIP.keySet()[0] - def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) - - logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def configList = parseJson(out.trim()) - assert configList instanceof List - - def s3BucketName = getS3BucketName() - def random = new Random(); - - def s3WithProperties = """WITH S3 ( - |"AWS_ACCESS_KEY" = "${getS3AK()}", - |"AWS_SECRET_KEY" = "${getS3SK()}", - |"AWS_ENDPOINT" = "${getS3Endpoint()}", - |"AWS_REGION" = "${getS3Region()}") - |PROPERTIES( - |"exec_mem_limit" = "8589934592", - |"load_parallelism" = "3")""".stripMargin() - - // set fe configuration - sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" - - def tableName = "column_witdh_array" - - def table_create_task = { table_name -> - // drop table if exists - sql """drop table if exists ${table_name}""" - // create table - def create_table = new File("""${context.file.parent}/ddl/${table_name}.sql""").text - create_table = create_table.replaceAll("\\\$\\{table\\_name\\}", table_name) - sql create_table - } - - def table_load_task = { table_name -> - uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() - loadLabel = table_name + "_" + uniqueID - //loadLabel = table_name + '_load_5' - loadSql = new File("""${context.file.parent}/ddl/${table_name}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) - loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) - loadSql = loadSql.replaceAll("\\\$\\{table\\_name\\}", table_name) - nowloadSql = loadSql + s3WithProperties - try_sql nowloadSql - - while (true) { - def stateResult = sql "show load where Label = '${loadLabel}'" - logger.info("load result is ${stateResult}") - def loadState = stateResult[stateResult.size() - 1][2].toString() - if ("CANCELLED".equalsIgnoreCase(loadState)) { - throw new IllegalStateException("load ${loadLabel} failed.") - } else if ("FINISHED".equalsIgnoreCase(loadState)) { - break - } - sleep(5000) - } - } - - table_create_task(tableName) - table_load_task(tableName) - - def tablets = sql_return_maparray """ show tablets from ${tableName}; """ - - boolean isOverLap = true - int tryCnt = 0; - while (isOverLap && tryCnt < 3) { - isOverLap = false - - for (def tablet in tablets) { - String tablet_id = tablet.TabletId - backend_id = tablet.BackendId - (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def compactJson = parseJson(out.trim()) - assertEquals("success", compactJson.status.toLowerCase()) - } - - // wait for all compactions done - for (def tablet in tablets) { - boolean running = true - do { - Thread.sleep(1000) - String tablet_id = tablet.TabletId - backend_id = tablet.BackendId - (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def compactionStatus = parseJson(out.trim()) - assertEquals("success", compactionStatus.status.toLowerCase()) - running = compactionStatus.run_status - } while (running) - } - - for (def tablet in tablets) { - String tablet_id = tablet.TabletId - (code, out, err) = curl("GET", tablet.CompactionStatus) - logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def tabletJson = parseJson(out.trim()) - assert tabletJson.rowsets instanceof List - for (String rowset in (List) tabletJson.rowsets) { - logger.info("rowset info" + rowset) - String overLappingStr = rowset.split(" ")[3] - if (overLappingStr == "OVERLAPPING") { - isOverLap = true; - } - logger.info("is over lap " + isOverLap + " " + overLappingStr) - } - } - tryCnt++; - } - - assertFalse(isOverLap); -}