diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 6128bdfb75258d..3dd5c1a3399643 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -445,7 +445,7 @@ Status BaseTablet::calc_delete_bitmap_between_segments( } std::vector BaseTablet::get_rowset_by_ids( - const RowsetIdUnorderedSet* specified_rowset_ids, bool include_stale) { + const RowsetIdUnorderedSet* specified_rowset_ids) { std::vector rowsets; for (auto& rs : _rs_version_map) { if (!specified_rowset_ids || @@ -454,15 +454,6 @@ std::vector BaseTablet::get_rowset_by_ids( } } - if (include_stale && specified_rowset_ids != nullptr && - rowsets.size() != specified_rowset_ids->size()) { - for (auto& rs : _stale_rs_version_map) { - if (specified_rowset_ids->find(rs.second->rowset_id()) != specified_rowset_ids->end()) { - rowsets.push_back(rs.second); - } - } - } - std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) { return lhs->end_version() > rhs->end_version(); }); diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 8f41ef6a57b4ea..397e93e80586c0 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -128,8 +128,8 @@ class BaseTablet { //////////////////////////////////////////////////////////////////////////// // begin MoW functions //////////////////////////////////////////////////////////////////////////// - std::vector get_rowset_by_ids(const RowsetIdUnorderedSet* specified_rowset_ids, - bool include_stale = false); + std::vector get_rowset_by_ids( + const RowsetIdUnorderedSet* specified_rowset_ids); // Lookup a row with TupleDescriptor and fill Block Status lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 0d35ae7ca4f148..ee7a2b1812a0ae 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -284,6 +284,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( transient_size += 1; input_rowsets->push_back(rowset); } + DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets", + { return transient_size; }) if (total_size >= promotion_size) { return transient_size; diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 782ebb5a60fc55..68605b3d3ac2fe 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -35,6 +35,7 @@ #include "io/io_common.h" #include "olap/olap_define.h" +#include "olap/rowset/rowset_fwd.h" #include "util/hash_util.hpp" #include "util/uid_util.h" @@ -492,11 +493,16 @@ class DeleteBitmap; // merge on write context struct MowContext { MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids, - std::shared_ptr db) - : max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {} + const std::vector& rowset_ptrs, std::shared_ptr db) + : max_version(version), + txn_id(txnid), + rowset_ids(ids), + rowset_ptrs(rowset_ptrs), + delete_bitmap(db) {} int64_t max_version; int64_t txn_id; const RowsetIdUnorderedSet& rowset_ids; + std::vector rowset_ptrs; std::shared_ptr delete_bitmap; }; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index f6be1917e57840..1606064e463c24 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -435,24 +435,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* std::vector specified_rowsets; { std::shared_lock rlock(tablet->get_header_lock()); - - // Under normal circumstances, `get_rowset_by_ids` does not need to consider the stale - // rowset, in other word, if a rowset id is not found in the normal rowset, we can ignore - // it. This is because even if we handle stale rowset here, we need to recalculate the - // new rowset generated by the corresponding compaction in the publish phase. - // However, for partial update, ignoring the stale rowset may cause some keys to not be - // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND), and thus be mistaken - // as new keys in the flush phase, which will cause the load to fail in the following - // two cases: - // 1. when strict_mode is enabled, new keys are not allowed to be added. - // 2. Some columns that need to be filled are neither nullable nor have a default value, - // in which case the value of the field cannot be filled as a new key, leading to a - // failure of the load. - bool should_include_stale = - _opts.rowset_ctx->partial_update_info->is_strict_mode || - !_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update; - specified_rowsets = - tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale); + specified_rowsets = _mow_context->rowset_ptrs; if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { // Only when this is a strict mode partial update that missing rowsets here will lead to problems. // In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase) diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 8f7bb9fe245f0e..b0b24a79c0a8c2 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -368,24 +368,10 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da std::vector specified_rowsets; { + DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_partial_content.sleep", + { sleep(60); }) std::shared_lock rlock(tablet->get_header_lock()); - // Under normal circumstances, `get_rowset_by_ids` does not need to consider the stale - // rowset, in other word, if a rowset id is not found in the normal rowset, we can ignore - // it. This is because even if we handle stale rowset here, we need to recalculate the - // new rowset generated by the corresponding compaction in the publish phase. - // However, for partial update, ignoring the stale rowset may cause some keys to not be - // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND), and thus be mistaken - // as new keys in the flush phase, which will cause the load to fail in the following - // two cases: - // 1. when strict_mode is enabled, new keys are not allowed to be added. - // 2. Some columns that need to be filled are neither nullable nor have a default value, - // in which case the value of the field cannot be filled as a new key, leading to a - // failure of the load. - bool should_include_stale = - _opts.rowset_ctx->partial_update_info->is_strict_mode || - !_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update; - specified_rowsets = - tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale); + specified_rowsets = _mow_context->rowset_ptrs; if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { // Only when this is a strict mode partial update that missing rowsets here will lead to problems. // In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase) diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 4458d43c17eb72..9d8b8163b716d7 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -123,6 +123,7 @@ void RowsetBuilder::_garbage_collection() { Status BaseRowsetBuilder::init_mow_context(std::shared_ptr& mow_context) { std::lock_guard lck(tablet()->get_header_lock()); int64_t cur_max_version = tablet()->max_version_unlocked(); + std::vector rowset_ptrs; // tablet is under alter process. The delete bitmap will be calculated after conversion. if (tablet()->tablet_state() == TABLET_NOTREADY) { // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' @@ -134,10 +135,11 @@ Status BaseRowsetBuilder::init_mow_context(std::shared_ptr& mow_cont _rowset_ids.clear(); } else { RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version, &_rowset_ids)); + rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids); } _delete_bitmap = std::make_shared(tablet()->tablet_id()); - mow_context = - std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, _delete_bitmap); + mow_context = std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, + rowset_ptrs, _delete_bitmap); return Status::OK(); } diff --git a/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy new file mode 100644 index 00000000000000..befad64da0a3bf --- /dev/null +++ b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_rowset_not_found_fault_injection", "p2,nonConcurrent") { + def testTable = "test_partial_update_rowset_not_found_fault_injection" + sql """ DROP TABLE IF EXISTS ${testTable}""" + sql """ + create table ${testTable} + ( + `k1` INT, + `v1` INT NOT NULL, + `v2` INT NOT NULL, + `v3` INT NOT NULL, + `v4` INT NOT NULL, + `v5` INT NOT NULL, + `v6` INT NOT NULL, + `v7` INT NOT NULL, + `v8` INT NOT NULL, + `v9` INT NOT NULL, + `v10` INT NOT NULL + ) + UNIQUE KEY (`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + def load_data = { + streamLoad { + table 'test_partial_update_rowset_not_found_fault_injection' + set 'column_separator', ',' + set 'compress_type', 'GZ' + + + file """${getS3Url()}/regression/fault_injection/test_partial_update_rowset_not_found_falut_injection1.csv.gz""" + + time 300000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string:[:]] + + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + load_data() + def error = false + + + GetDebugPoint().clearDebugPointsForAllBEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep") + GetDebugPoint().enableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets") + def thread = Thread.start{ + try { + sql """update ${testTable} set v10=1""" + } + catch (Exception e){ + logger.info(e.getMessage()) + error = true + } + } + + Thread.sleep(2000) + // trigger compactions for all tablets in ${tableName} + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } + + thread.join() + assertFalse(error) + } catch (Exception e){ + logger.info(e.getMessage()) + assertFalse(true) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep") + GetDebugPoint().disableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets") + } +}