From 4baffb0542a62fc84b7550cce0761729507177e9 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Thu, 25 Apr 2024 17:09:01 +0800 Subject: [PATCH] Issue: When users execute the update statement, they encounter a logic for partial column updates. For large datasets, during the update process, the error "the unmentioned column xxx should have default value or be nullable for newly inserted rows in non-strict mode partial update" may appear. Cause: In the logic of partial column updates, the existing data columns are read first, and then the data is supplemented and written back. During the reading process, initialization involves initially fetching rowset IDs, and the actual rowset object is fetched only when needed later. However, between fetching the rowset IDs and the rowset object, compaction may occur, turning the old rowset into a stale rowset. If too much time passes, the stale rowset might be directly deleted. Thus, when the rowset object is needed for an update, it cannot be found. Although the update operation with partial column logic should be able to read all keys and should not encounter new keys, if the rowset disappears, the Backend (BE) will consider these keys as missing. Consequently, it will check whether other columns have default values or are nullable. If this check fails, the aforementioned error is thrown. Solution: To avoid such issues during partial column updates, the initialization step should involve fetching both the rowset IDs and the shared pointer to the rowset object simultaneously. This ensures that the rowset can always be found during data retrieval. --- be/src/olap/base_tablet.cpp | 11 +- be/src/olap/base_tablet.h | 4 +- be/src/olap/cumulative_compaction_policy.cpp | 2 + be/src/olap/olap_common.h | 10 +- .../olap/rowset/segment_v2/segment_writer.cpp | 19 +-- .../segment_v2/vertical_segment_writer.cpp | 20 +--- be/src/olap/rowset_builder.cpp | 6 +- ...te_rowset_not_found_fault_injection.groovy | 112 ++++++++++++++++++ 8 files changed, 133 insertions(+), 51 deletions(-) create mode 100644 regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 6128bdfb75258d..3dd5c1a3399643 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -445,7 +445,7 @@ Status BaseTablet::calc_delete_bitmap_between_segments( } std::vector BaseTablet::get_rowset_by_ids( - const RowsetIdUnorderedSet* specified_rowset_ids, bool include_stale) { + const RowsetIdUnorderedSet* specified_rowset_ids) { std::vector rowsets; for (auto& rs : _rs_version_map) { if (!specified_rowset_ids || @@ -454,15 +454,6 @@ std::vector BaseTablet::get_rowset_by_ids( } } - if (include_stale && specified_rowset_ids != nullptr && - rowsets.size() != specified_rowset_ids->size()) { - for (auto& rs : _stale_rs_version_map) { - if (specified_rowset_ids->find(rs.second->rowset_id()) != specified_rowset_ids->end()) { - rowsets.push_back(rs.second); - } - } - } - std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) { return lhs->end_version() > rhs->end_version(); }); diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 8f41ef6a57b4ea..397e93e80586c0 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -128,8 +128,8 @@ class BaseTablet { //////////////////////////////////////////////////////////////////////////// // begin MoW functions //////////////////////////////////////////////////////////////////////////// - std::vector get_rowset_by_ids(const RowsetIdUnorderedSet* specified_rowset_ids, - bool include_stale = false); + std::vector get_rowset_by_ids( + const RowsetIdUnorderedSet* specified_rowset_ids); // Lookup a row with TupleDescriptor and fill Block Status lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 0d35ae7ca4f148..ee7a2b1812a0ae 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -284,6 +284,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( transient_size += 1; input_rowsets->push_back(rowset); } + DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets", + { return transient_size; }) if (total_size >= promotion_size) { return transient_size; diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 782ebb5a60fc55..68605b3d3ac2fe 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -35,6 +35,7 @@ #include "io/io_common.h" #include "olap/olap_define.h" +#include "olap/rowset/rowset_fwd.h" #include "util/hash_util.hpp" #include "util/uid_util.h" @@ -492,11 +493,16 @@ class DeleteBitmap; // merge on write context struct MowContext { MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids, - std::shared_ptr db) - : max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {} + const std::vector& rowset_ptrs, std::shared_ptr db) + : max_version(version), + txn_id(txnid), + rowset_ids(ids), + rowset_ptrs(rowset_ptrs), + delete_bitmap(db) {} int64_t max_version; int64_t txn_id; const RowsetIdUnorderedSet& rowset_ids; + std::vector rowset_ptrs; std::shared_ptr delete_bitmap; }; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index f6be1917e57840..1606064e463c24 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -435,24 +435,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* std::vector specified_rowsets; { std::shared_lock rlock(tablet->get_header_lock()); - - // Under normal circumstances, `get_rowset_by_ids` does not need to consider the stale - // rowset, in other word, if a rowset id is not found in the normal rowset, we can ignore - // it. This is because even if we handle stale rowset here, we need to recalculate the - // new rowset generated by the corresponding compaction in the publish phase. - // However, for partial update, ignoring the stale rowset may cause some keys to not be - // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND), and thus be mistaken - // as new keys in the flush phase, which will cause the load to fail in the following - // two cases: - // 1. when strict_mode is enabled, new keys are not allowed to be added. - // 2. Some columns that need to be filled are neither nullable nor have a default value, - // in which case the value of the field cannot be filled as a new key, leading to a - // failure of the load. - bool should_include_stale = - _opts.rowset_ctx->partial_update_info->is_strict_mode || - !_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update; - specified_rowsets = - tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale); + specified_rowsets = _mow_context->rowset_ptrs; if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { // Only when this is a strict mode partial update that missing rowsets here will lead to problems. // In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase) diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index 8f7bb9fe245f0e..b0b24a79c0a8c2 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -368,24 +368,10 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da std::vector specified_rowsets; { + DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_partial_content.sleep", + { sleep(60); }) std::shared_lock rlock(tablet->get_header_lock()); - // Under normal circumstances, `get_rowset_by_ids` does not need to consider the stale - // rowset, in other word, if a rowset id is not found in the normal rowset, we can ignore - // it. This is because even if we handle stale rowset here, we need to recalculate the - // new rowset generated by the corresponding compaction in the publish phase. - // However, for partial update, ignoring the stale rowset may cause some keys to not be - // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND), and thus be mistaken - // as new keys in the flush phase, which will cause the load to fail in the following - // two cases: - // 1. when strict_mode is enabled, new keys are not allowed to be added. - // 2. Some columns that need to be filled are neither nullable nor have a default value, - // in which case the value of the field cannot be filled as a new key, leading to a - // failure of the load. - bool should_include_stale = - _opts.rowset_ctx->partial_update_info->is_strict_mode || - !_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update; - specified_rowsets = - tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale); + specified_rowsets = _mow_context->rowset_ptrs; if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { // Only when this is a strict mode partial update that missing rowsets here will lead to problems. // In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase) diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 4458d43c17eb72..9d8b8163b716d7 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -123,6 +123,7 @@ void RowsetBuilder::_garbage_collection() { Status BaseRowsetBuilder::init_mow_context(std::shared_ptr& mow_context) { std::lock_guard lck(tablet()->get_header_lock()); int64_t cur_max_version = tablet()->max_version_unlocked(); + std::vector rowset_ptrs; // tablet is under alter process. The delete bitmap will be calculated after conversion. if (tablet()->tablet_state() == TABLET_NOTREADY) { // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' @@ -134,10 +135,11 @@ Status BaseRowsetBuilder::init_mow_context(std::shared_ptr& mow_cont _rowset_ids.clear(); } else { RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version, &_rowset_ids)); + rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids); } _delete_bitmap = std::make_shared(tablet()->tablet_id()); - mow_context = - std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, _delete_bitmap); + mow_context = std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, + rowset_ptrs, _delete_bitmap); return Status::OK(); } diff --git a/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy new file mode 100644 index 00000000000000..befad64da0a3bf --- /dev/null +++ b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_rowset_not_found_fault_injection", "p2,nonConcurrent") { + def testTable = "test_partial_update_rowset_not_found_fault_injection" + sql """ DROP TABLE IF EXISTS ${testTable}""" + sql """ + create table ${testTable} + ( + `k1` INT, + `v1` INT NOT NULL, + `v2` INT NOT NULL, + `v3` INT NOT NULL, + `v4` INT NOT NULL, + `v5` INT NOT NULL, + `v6` INT NOT NULL, + `v7` INT NOT NULL, + `v8` INT NOT NULL, + `v9` INT NOT NULL, + `v10` INT NOT NULL + ) + UNIQUE KEY (`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + def load_data = { + streamLoad { + table 'test_partial_update_rowset_not_found_fault_injection' + set 'column_separator', ',' + set 'compress_type', 'GZ' + + + file """${getS3Url()}/regression/fault_injection/test_partial_update_rowset_not_found_falut_injection1.csv.gz""" + + time 300000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string:[:]] + + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + load_data() + def error = false + + + GetDebugPoint().clearDebugPointsForAllBEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep") + GetDebugPoint().enableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets") + def thread = Thread.start{ + try { + sql """update ${testTable} set v10=1""" + } + catch (Exception e){ + logger.info(e.getMessage()) + error = true + } + } + + Thread.sleep(2000) + // trigger compactions for all tablets in ${tableName} + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } + + thread.join() + assertFalse(error) + } catch (Exception e){ + logger.info(e.getMessage()) + assertFalse(true) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep") + GetDebugPoint().disableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets") + } +}