From 4c2de425c87bc34cb648795c70452a18b1f8324c Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 29 Jul 2024 19:10:04 +0800 Subject: [PATCH 1/9] tmp --- be/src/olap/base_tablet.cpp | 34 ++++++++++++++++++++++++++++++-- be/src/olap/rowset/rowset.h | 1 + be/src/olap/rowset/rowset_meta.h | 6 ++++++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 22940b40206de4..a4cd76128c3f72 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1201,7 +1201,9 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf std::unique_ptr transient_rs_writer; DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap; - if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) { + bool is_partial_update = + txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update; + if (is_partial_update) { transient_rs_writer = DORIS_TRY(self->create_transient_rowset_writer( *rowset, txn_info->partial_update_info, txn_expiration)); // Partial update might generate new segments when there is conflicts while publish, and mark @@ -1242,6 +1244,33 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf } auto t3 = watch.get_elapse_time_us(); + // If a rowset is produced by compaction before the commit phase of the partial update load + // and is not included in txn_info->rowset_ids, we can skip the alignment process of that rowset + // because data remains the same before and after compaction. But we still need to calculate the + // the delete bitmap for that rowset. + std::vector rowsets_skip_alignment; + if (is_partial_update) { + std::vector remained_rowsets; + for (const auto& rowset : specified_rowsets) { + if (rowset->produced_by_compaction()) { + rowsets_skip_alignment.emplace_back(rowset); + } else { + remained_rowsets.emplace_back(rowset); + } + } + if (!rowsets_skip_alignment.empty()) { + specified_rowsets = std::move(remained_rowsets); + } + } + + if (!rowsets_skip_alignment.empty()) { + auto token = self->calc_delete_bitmap_executor()->create_token(); + // set rowset_writer to nullptr to skip the alignment process + RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, rowsets_skip_alignment, + delete_bitmap, cur_version - 1, token.get(), nullptr)); + RETURN_IF_ERROR(token->wait()); + } + // When there is only one segment, it will be calculated in the current thread. // Otherwise, it will be submitted to the thread pool for calculation. if (segments.size() <= 1) { @@ -1433,7 +1462,8 @@ Status BaseTablet::update_delete_bitmap_without_lock( return Status::InternalError( "debug tablet update delete bitmap without lock random failed"); } else { - LOG(INFO) << "BaseTablet.update_delete_bitmap_without_lock.random_failed not triggered" + LOG(INFO) << "BaseTablet.update_delete_bitmap_without_lock.random_failed not " + "triggered" << ", rnd:" << rnd << ", percent: " << percent; } }); diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 310d0901b2a751..6050a33bfc2f5d 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -169,6 +169,7 @@ class Rowset : public std::enable_shared_from_this { bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } KeysType keys_type() { return _schema->keys_type(); } RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); } + bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); } // remove all files in this rowset // TODO should we rename the method to remove_files() to be more specific? diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index aa20b5b1ef13ac..c5a573d760c305 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -255,6 +255,12 @@ class RowsetMeta { return num_segments() > 1 && is_singleton_delta() && segments_overlap() != NONOVERLAPPING; } + bool produced_by_compaction() const { + return has_version() && + (start_version() < end_version() || + (start_version() == end_version() && segments_overlap() == NONOVERLAPPING)); + } + // get the compaction score of this rowset. // if segments are overlapping, the score equals to the number of segments, // otherwise, score is 1. From 2e9920772eb127635d7750a082ae131f054bf1a5 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Mon, 29 Jul 2024 19:39:31 +0800 Subject: [PATCH 2/9] tmp --- be/src/olap/base_tablet.cpp | 6 +++++- be/src/olap/partial_update_info.h | 5 +++-- be/src/olap/rowset_builder.cpp | 13 ++++++++----- be/src/olap/rowset_builder.h | 1 + 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index a4cd76128c3f72..edc97e456c0dd3 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1250,9 +1250,13 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf // the delete bitmap for that rowset. std::vector rowsets_skip_alignment; if (is_partial_update) { + int64_t max_version_in_flush_phase = + txn_info->partial_update_info->max_version_in_flush_phase; + DCHECK(max_version_in_flush_phase != -1); std::vector remained_rowsets; for (const auto& rowset : specified_rowsets) { - if (rowset->produced_by_compaction()) { + if (rowset->end_version() < max_version_in_flush_phase && + rowset->produced_by_compaction()) { rowsets_skip_alignment.emplace_back(rowset); } else { remained_rowsets.emplace_back(rowset); diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index f20f9680b0b57a..4b62cb8f0ffb31 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -25,10 +25,10 @@ struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone, - const std::string& auto_increment_column) { + const std::string& auto_increment_column, int64_t cur_max_version = -1) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; - + max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; this->timezone = timezone; missing_cids.clear(); @@ -91,6 +91,7 @@ struct PartialUpdateInfo { public: bool is_partial_update {false}; + int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; std::vector update_cids; diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 93058c05be332f..85006cc183a79d 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -36,6 +36,7 @@ #include "io/fs/file_writer.h" // IWYU pragma: keep #include "olap/calc_delete_bitmap_executor.h" #include "olap/olap_define.h" +#include "olap/partial_update_info.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/pending_rowset_helper.h" @@ -123,7 +124,7 @@ void RowsetBuilder::_garbage_collection() { Status BaseRowsetBuilder::init_mow_context(std::shared_ptr& mow_context) { std::lock_guard lck(tablet()->get_header_lock()); - int64_t cur_max_version = tablet()->max_version_unlocked(); + _max_version_in_flush_phase = tablet()->max_version_unlocked(); std::vector rowset_ptrs; // tablet is under alter process. The delete bitmap will be calculated after conversion. if (tablet()->tablet_state() == TABLET_NOTREADY) { @@ -135,12 +136,13 @@ Status BaseRowsetBuilder::init_mow_context(std::shared_ptr& mow_cont } _rowset_ids.clear(); } else { - RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version, &_rowset_ids)); + RETURN_IF_ERROR( + tablet()->get_all_rs_id_unlocked(_max_version_in_flush_phase, &_rowset_ids)); rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids); } _delete_bitmap = std::make_shared(tablet()->tablet_id()); - mow_context = std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, - rowset_ptrs, _delete_bitmap); + mow_context = std::make_shared(_max_version_in_flush_phase, _req.txn_id, + _rowset_ids, rowset_ptrs, _delete_bitmap); return Status::OK(); } @@ -408,7 +410,8 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn()); + table_schema_param->auto_increment_coulumn(), + _max_version_in_flush_phase); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index e54faee3435c79..7fd578037363a0 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -106,6 +106,7 @@ class BaseRowsetBuilder { std::unique_ptr _calc_delete_bitmap_token; // current rowset_ids, used to do diff in publish_version RowsetIdUnorderedSet _rowset_ids; + int64_t _max_version_in_flush_phase {-1}; std::shared_ptr _partial_update_info; From 3a6e314f33907f57c68605af9f17ba3743fc986a Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 30 Jul 2024 11:26:31 +0800 Subject: [PATCH 3/9] add inject points and skip rowsets case --- be/src/olap/base_tablet.cpp | 10 ++ ...tial_update_column_num_fault_injection.out | 0 ...ial_update_publish_conflict_with_error.out | 0 .../test_partial_update_skip_compaction.out | 11 ++ ...l_update_column_num_fault_injection.groovy | 0 ..._update_publish_conflict_with_error.groovy | 0 ...test_partial_update_skip_compaction.groovy | 164 ++++++++++++++++++ 7 files changed, 185 insertions(+) rename regression-test/data/fault_injection_p0/{ => partial_update}/test_partial_update_column_num_fault_injection.out (100%) rename regression-test/data/fault_injection_p0/{ => partial_update}/test_partial_update_publish_conflict_with_error.out (100%) create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out rename regression-test/suites/fault_injection_p0/{ => partial_update}/test_partial_update_column_num_fault_injection.groovy (100%) rename regression-test/suites/fault_injection_p0/{ => partial_update}/test_partial_update_publish_conflict_with_error.groovy (100%) create mode 100644 regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index edc97e456c0dd3..4c8e23dc37fc5b 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1192,6 +1192,16 @@ void BaseTablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info, int64_t txn_id, int64_t txn_expiration) { + DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.enable_spin_wait", { + auto block_tablet_id = dp->param("tablet_id"); + LOG_INFO( + "BaseTablet::update_delete_bitmap.enable_spin_wait, self->table_id()={}, " + "block_tablet_id={}", + self->tablet_id(), block_tablet_id); + if (self->tablet_id() == block_tablet_id) { + DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.block", DBUG_BLOCK); + } + }); SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency); RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; diff --git a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.out similarity index 100% rename from regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out rename to regression-test/data/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.out diff --git a/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out similarity index 100% rename from regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out rename to regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out new file mode 100644 index 00000000000000..6c7fe443a894fa --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 1 1 +2 888 888 2 2 +3 777 777 3 3 + diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.groovy similarity index 100% rename from regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy rename to regression-test/suites/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.groovy diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy similarity index 100% rename from regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy rename to regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy new file mode 100644 index 00000000000000..7e2e740e555a77 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils +import org.apache.doris.regression.suite.ClusterOptions + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_skip_compaction", "nonConcurrent") { + + def table1 = "test_partial_update_skip_compaction" + sql "DROP TABLE IF EXISTS ${table1};" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + assertEquals(endVersion, 1) + assertEquals(numSegments, 0) + } else { + assertEquals(startVersion, endVersion) + assertEquals(numSegments, 1) + } + }) + + try { + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update before publish phase + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [tablet_id: "${tabletId}"]) + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + assertEquals(startVersion, 0) + assertEquals(endVersion, 4) + assertEquals(overlapPb, "NONOVERLAPPING") + }) + + GetDebugPoint().disableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + t1.join() + + order_qt_sql "select * from ${table1};" + + check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 5) { + assertEquals(endVersion, 5) + // checks that partial update skips the alignment process of rowsets produced by compaction and + // doesn't generate new segment in publish phase + assertEquals(numSegments, 1) + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // sql "DROP TABLE IF EXISTS ${table1};" +} From 9efafb444a7a06191ab3e336d6b02e9dab8ae95e Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 30 Jul 2024 15:08:46 +0800 Subject: [PATCH 4/9] tmp --- ...ial_update_conflict_skip_compaction.groovy | 190 ++++++++++++++++++ ...test_partial_update_skip_compaction.groovy | 43 ++-- 2 files changed, 215 insertions(+), 18 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy new file mode 100644 index 00000000000000..a3988be381af19 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils +import org.apache.doris.regression.suite.ClusterOptions +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { + + def table1 = "test_partial_update_conflict_skip_compaction" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + Assert.assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + try { + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update before publish phase + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [tablet_id: "${tabletId}"]) + GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + + // the first partial update load + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + Thread.sleep(200) + + // the second partial update load that has conflict with the first one + def t2 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" + } + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + // [0-4] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 4) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") + }) + + GetDebugPoint().disableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait") + GetDebugPoint().disableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + + t1.join() + t2.join() + + order_qt_sql "select * from ${table1};" + + check_rs_metas(3, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 5) { + // the first partial update load + // it should skip the alignment process of rowsets produced by full compaction and + // should not generate new segment in publish phase + Assert.assertEquals(endVersion, 5) + Assert.assertEquals(numSegments, 1) + Assert.assertEquals(numRows, 2) + } else if (startVersion == 6) { + // the first partial update load + // it should skip the alignment process of rowsets produced by full compaction and + // should generate new segment in publish phase for conflicting rows with the first partial update load + Assert.assertEquals(endVersion, 6) + Assert.assertEquals(numSegments, 2) + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy index 7e2e740e555a77..63068f1cfc5769 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -33,14 +33,14 @@ import org.apache.http.entity.StringEntity import org.apache.http.client.methods.CloseableHttpResponse import org.apache.http.util.EntityUtils import org.apache.doris.regression.suite.ClusterOptions - +import org.junit.Assert import java.util.concurrent.TimeUnit import org.awaitility.Awaitility suite("test_partial_update_skip_compaction", "nonConcurrent") { def table1 = "test_partial_update_skip_compaction" - sql "DROP TABLE IF EXISTS ${table1};" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" sql """ CREATE TABLE IF NOT EXISTS ${table1} ( `k1` int NOT NULL, `c1` int, @@ -75,10 +75,11 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { def check_rs_metas = { expected_rs_meta_size, check_func -> def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl def (code, out, err) = curl("GET", metaUrl) - assertEquals(code, 0) + Assert.assertEquals(code, 0) def jsonMeta = parseJson(out.trim()) - assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + logger.info("check size: ${jsonMeta.rs_metas.size()} v.s ${expected_rs_meta_size}") + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) for (def meta : jsonMeta.rs_metas) { int startVersion = meta.start_version int endVersion = meta.end_version @@ -92,11 +93,13 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> if (startVersion == 0) { - assertEquals(endVersion, 1) - assertEquals(numSegments, 0) + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) } else { - assertEquals(startVersion, endVersion) - assertEquals(numSegments, 1) + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) } }) @@ -116,29 +119,32 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) + Assert.assertEquals(code, 0) def compactJson = parseJson(out.trim()) - assertEquals("success", compactJson.status.toLowerCase()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) // wait for full compaction to complete Awaitility.await().atMost(3, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( { (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) + Assert.assertEquals(code, 0) def compactionStatus = parseJson(out.trim()) - assertEquals("success", compactionStatus.status.toLowerCase()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) return !compactionStatus.run_status } ) check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> // check the rowset produced by full compaction - assertEquals(startVersion, 0) - assertEquals(endVersion, 4) - assertEquals(overlapPb, "NONOVERLAPPING") + // [0-4] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 4) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") }) + // let the partial update load publish GetDebugPoint().disableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") t1.join() @@ -146,10 +152,11 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> if (startVersion == 5) { - assertEquals(endVersion, 5) + // [5-5] + Assert.assertEquals(endVersion, 5) // checks that partial update skips the alignment process of rowsets produced by compaction and // doesn't generate new segment in publish phase - assertEquals(numSegments, 1) + Assert.assertEquals(numSegments, 1) } }) @@ -160,5 +167,5 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() } - // sql "DROP TABLE IF EXISTS ${table1};" + sql "DROP TABLE IF EXISTS ${table1};" } From 8d0999a048078fd1d25f53286250aa6877abe13f Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 30 Jul 2024 16:03:54 +0800 Subject: [PATCH 5/9] fix test --- be/src/olap/base_tablet.cpp | 14 ++------ .../olap/task/engine_publish_version_task.cpp | 1 + ...artial_update_conflict_skip_compaction.out | 11 +++++++ ...ial_update_conflict_skip_compaction.groovy | 33 +++++-------------- ...test_partial_update_skip_compaction.groovy | 31 ++++------------- 5 files changed, 30 insertions(+), 60 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 4c8e23dc37fc5b..211aeee3bd52b6 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1192,16 +1192,6 @@ void BaseTablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info, int64_t txn_id, int64_t txn_expiration) { - DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.enable_spin_wait", { - auto block_tablet_id = dp->param("tablet_id"); - LOG_INFO( - "BaseTablet::update_delete_bitmap.enable_spin_wait, self->table_id()={}, " - "block_tablet_id={}", - self->tablet_id(), block_tablet_id); - if (self->tablet_id() == block_tablet_id) { - DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.block", DBUG_BLOCK); - } - }); SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency); RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; @@ -1262,10 +1252,12 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf if (is_partial_update) { int64_t max_version_in_flush_phase = txn_info->partial_update_info->max_version_in_flush_phase; + LOG_INFO("BaseTablet::update_delete_bitmap, max_version_in_flush_phase={}", + max_version_in_flush_phase); DCHECK(max_version_in_flush_phase != -1); std::vector remained_rowsets; for (const auto& rowset : specified_rowsets) { - if (rowset->end_version() < max_version_in_flush_phase && + if (rowset->end_version() <= max_version_in_flush_phase && rowset->produced_by_compaction()) { rowsets_skip_alignment.emplace_back(rowset); } else { diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index acdcebae165c6f..3c669b9d86eac0 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -111,6 +111,7 @@ Status EnginePublishVersionTask::execute() { std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); + DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.block", DBUG_BLOCK); std::unique_ptr token = _engine.tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); std::unordered_map tablet_id_to_num_delta_rows; diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out new file mode 100644 index 00000000000000..df12f4b08e5706 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 666 666 +2 888 888 2 2 +3 777 777 555 555 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy index a3988be381af19..afa6caf7c928ae 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy @@ -15,24 +15,6 @@ // specific language governing permissions and limitations // under the License. -import java.util.Date -import java.text.SimpleDateFormat -import org.apache.http.HttpResponse -import org.apache.http.client.methods.HttpPut -import org.apache.http.impl.client.CloseableHttpClient -import org.apache.http.impl.client.HttpClients -import org.apache.http.entity.ContentType -import org.apache.http.entity.StringEntity -import org.apache.http.client.config.RequestConfig -import org.apache.http.client.RedirectStrategy -import org.apache.http.protocol.HttpContext -import org.apache.http.HttpRequest -import org.apache.http.impl.client.LaxRedirectStrategy -import org.apache.http.client.methods.RequestBuilder -import org.apache.http.entity.StringEntity -import org.apache.http.client.methods.CloseableHttpResponse -import org.apache.http.util.EntityUtils -import org.apache.doris.regression.suite.ClusterOptions import org.junit.Assert import java.util.concurrent.TimeUnit import org.awaitility.Awaitility @@ -106,8 +88,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() // block the partial update before publish phase - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [tablet_id: "${tabletId}"]) - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") // the first partial update load def t1 = Thread.start { @@ -116,7 +97,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" } - Thread.sleep(200) + Thread.sleep(300) // the second partial update load that has conflict with the first one def t2 = Thread.start { @@ -125,6 +106,8 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" } + Thread.sleep(300) + // trigger full compaction on tablet logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) @@ -134,7 +117,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { Assert.assertEquals("success", compactJson.status.toLowerCase()) // wait for full compaction to complete - Awaitility.await().atMost(3, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( { (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) @@ -154,8 +137,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { Assert.assertEquals(overlapPb, "NONOVERLAPPING") }) - GetDebugPoint().disableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait") - GetDebugPoint().disableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") t1.join() t2.join() @@ -169,13 +151,14 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { // should not generate new segment in publish phase Assert.assertEquals(endVersion, 5) Assert.assertEquals(numSegments, 1) - Assert.assertEquals(numRows, 2) + Assert.assertEquals(numRows, 3) } else if (startVersion == 6) { // the first partial update load // it should skip the alignment process of rowsets produced by full compaction and // should generate new segment in publish phase for conflicting rows with the first partial update load Assert.assertEquals(endVersion, 6) Assert.assertEquals(numSegments, 2) + Assert.assertEquals(numRows, 4) // 4 = 2 + 2 } }) diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy index 63068f1cfc5769..91cb34f45812dc 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -15,24 +15,6 @@ // specific language governing permissions and limitations // under the License. -import java.util.Date -import java.text.SimpleDateFormat -import org.apache.http.HttpResponse -import org.apache.http.client.methods.HttpPut -import org.apache.http.impl.client.CloseableHttpClient -import org.apache.http.impl.client.HttpClients -import org.apache.http.entity.ContentType -import org.apache.http.entity.StringEntity -import org.apache.http.client.config.RequestConfig -import org.apache.http.client.RedirectStrategy -import org.apache.http.protocol.HttpContext -import org.apache.http.HttpRequest -import org.apache.http.impl.client.LaxRedirectStrategy -import org.apache.http.client.methods.RequestBuilder -import org.apache.http.entity.StringEntity -import org.apache.http.client.methods.CloseableHttpResponse -import org.apache.http.util.EntityUtils -import org.apache.doris.regression.suite.ClusterOptions import org.junit.Assert import java.util.concurrent.TimeUnit import org.awaitility.Awaitility @@ -78,7 +60,6 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { Assert.assertEquals(code, 0) def jsonMeta = parseJson(out.trim()) - logger.info("check size: ${jsonMeta.rs_metas.size()} v.s ${expected_rs_meta_size}") Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) for (def meta : jsonMeta.rs_metas) { int startVersion = meta.start_version @@ -106,15 +87,16 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { try { GetDebugPoint().clearDebugPointsForAllBEs() - // block the partial update before publish phase - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait", [tablet_id: "${tabletId}"]) - GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + // block the partial update in publish phase + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") def t1 = Thread.start { sql "set enable_unique_key_partial_update=true;" sql "sync;" sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" } + Thread.sleep(500) + // trigger full compaction on tablet logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) @@ -124,7 +106,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { Assert.assertEquals("success", compactJson.status.toLowerCase()) // wait for full compaction to complete - Awaitility.await().atMost(3, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( { (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) @@ -145,7 +127,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { }) // let the partial update load publish - GetDebugPoint().disableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block") + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") t1.join() order_qt_sql "select * from ${table1};" @@ -157,6 +139,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { // checks that partial update skips the alignment process of rowsets produced by compaction and // doesn't generate new segment in publish phase Assert.assertEquals(numSegments, 1) + Assert.assertEquals(numRows, 3) } }) From d92504acd2ea835716ca63a22a60cf716432274b Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 30 Jul 2024 16:43:05 +0800 Subject: [PATCH 6/9] add debug point for cloud --- .../CloudGlobalTransactionMgr.java | 11 +++++++ ...ial_update_conflict_skip_compaction.groovy | 30 ++++++++++++++++--- ...test_partial_update_skip_compaction.groovy | 26 ++++++++++++++-- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 862c72fe742709..9323f8ef6c86ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -715,6 +715,17 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo LOG.info("error ", e); } } + if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")) { + LOG.info("debug point: block at CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block"); + while (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + LOG.info("error ", e); + } + } + LOG.info("debug point: leave CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock"); + } StopWatch stopWatch = new StopWatch(); stopWatch.start(); int totalRetryTime = 0; diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy index afa6caf7c928ae..4cdcf2e944d2a4 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy @@ -55,6 +55,10 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl def (code, out, err) = curl("GET", metaUrl) Assert.assertEquals(code, 0) @@ -84,11 +88,28 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { } }) + def enable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + try { + GetDebugPoint().clearDebugPointsForAllFEs() GetDebugPoint().clearDebugPointsForAllBEs() // block the partial update before publish phase - GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + enable_block_in_publish() // the first partial update load def t1 = Thread.start { @@ -137,7 +158,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { Assert.assertEquals(overlapPb, "NONOVERLAPPING") }) - GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + disable_block_in_publish() t1.join() t2.join() @@ -153,7 +174,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { Assert.assertEquals(numSegments, 1) Assert.assertEquals(numRows, 3) } else if (startVersion == 6) { - // the first partial update load + // the second partial update load // it should skip the alignment process of rowsets produced by full compaction and // should generate new segment in publish phase for conflicting rows with the first partial update load Assert.assertEquals(endVersion, 6) @@ -166,8 +187,9 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { logger.info(e.getMessage()) throw e } finally { + GetDebugPoint().clearDebugPointsForAllFEs() GetDebugPoint().clearDebugPointsForAllBEs() } - // sql "DROP TABLE IF EXISTS ${table1};" + sql "DROP TABLE IF EXISTS ${table1};" } diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy index 91cb34f45812dc..2cb99b87f8693d 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -55,6 +55,10 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl def (code, out, err) = curl("GET", metaUrl) Assert.assertEquals(code, 0) @@ -84,11 +88,28 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { } }) + def enable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + try { + GetDebugPoint().clearDebugPointsForAllFEs() GetDebugPoint().clearDebugPointsForAllBEs() // block the partial update in publish phase - GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + enable_block_in_publish() def t1 = Thread.start { sql "set enable_unique_key_partial_update=true;" sql "sync;" @@ -127,7 +148,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { }) // let the partial update load publish - GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + disable_block_in_publish() t1.join() order_qt_sql "select * from ${table1};" @@ -147,6 +168,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { logger.info(e.getMessage()) throw e } finally { + GetDebugPoint().clearDebugPointsForAllFEs() GetDebugPoint().clearDebugPointsForAllBEs() } From 2c711fe5551519cfa7939be0f5308b0fdf6740d9 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Tue, 30 Jul 2024 20:04:18 +0800 Subject: [PATCH 7/9] add case for partial update meet compaction with higher version --- .../olap/task/engine_publish_version_task.cpp | 15 +- .../concurrency_update1.csv | 0 .../concurrency_update2.csv | 0 .../concurrency_update3.csv | 0 ..._update_compaction_with_higher_version.out | 11 + ...date_compaction_with_higher_version.groovy | 221 ++++++++++++++++++ ...ial_update_conflict_skip_compaction.groovy | 15 ++ ...test_partial_update_skip_compaction.groovy | 15 ++ 8 files changed, 276 insertions(+), 1 deletion(-) rename regression-test/data/fault_injection_p0/{ => partial_update}/concurrency_update1.csv (100%) rename regression-test/data/fault_injection_p0/{ => partial_update}/concurrency_update2.csv (100%) rename regression-test/data/fault_injection_p0/{ => partial_update}/concurrency_update3.csv (100%) create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out create mode 100644 regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 3c669b9d86eac0..ae7b694b6dfab4 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -111,7 +111,20 @@ Status EnginePublishVersionTask::execute() { std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); - DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.block", DBUG_BLOCK); + DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.enable_spin_wait", { + auto token = dp->param("token", "invalid_token"); + while (DebugPoints::instance()->is_enable("EnginePublishVersionTask::execute.block")) { + auto block_dp = DebugPoints::instance()->get_debug_point( + "EnginePublishVersionTask::execute.block"); + if (block_dp) { + auto pass_token = block_dp->param("pass_token", ""); + if (pass_token == token) { + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); std::unique_ptr token = _engine.tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); std::unordered_map tablet_id_to_num_delta_rows; diff --git a/regression-test/data/fault_injection_p0/concurrency_update1.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update1.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv diff --git a/regression-test/data/fault_injection_p0/concurrency_update2.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update2.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv diff --git a/regression-test/data/fault_injection_p0/concurrency_update3.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update3.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out new file mode 100644 index 00000000000000..df12f4b08e5706 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 666 666 +2 888 888 2 2 +3 777 777 555 555 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy new file mode 100644 index 00000000000000..0d875d68ea37ac --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") { + + // DOING(baohan) + def table1 = "test_partial_update_compaction_with_higher_version" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + Assert.assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + def enable_publish_spin_wait = { tokenName -> + if (isCloudMode()) { + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait", [token: "${tokenName}"]) + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { passToken -> + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block", [pass_token: "${passToken}"]) + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update in publish phase + enable_publish_spin_wait("token1") + enable_block_in_publish("-1") + + // the first partial update load + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + Thread.sleep(600) + + // the second partial update load that conflicts with the first one + enable_publish_spin_wait("token2") + def t2 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" + } + + Thread.sleep(400) + + // let the first partial update load finish + enable_block_in_publish("token1") + t1.join() + Thread.sleep(200) + check_rs_metas(5, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4], [5-5] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + // [0-5] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 5) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") + }) + + // let the second partial update load publish + disable_block_in_publish() + t1.join() + Thread.sleep(300) + + order_qt_sql "select * from ${table1};" + + check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 6) { + // [6-6] + Assert.assertEquals(endVersion, 6) + // checks that partial update didn't skip the alignment process of rowsets produced by compaction and + // generate new segment in publish phase + Assert.assertEquals(numSegments, 2) + Assert.assertEquals(numRows, 4) // 4 = 2 + 2 + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy index 4cdcf2e944d2a4..9316ded81a712e 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy @@ -88,6 +88,20 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { } }) + def enable_publish_spin_wait = { + if (isCloudMode()) { + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + def enable_block_in_publish = { if (isCloudMode()) { GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") @@ -109,6 +123,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() // block the partial update before publish phase + enable_publish_spin_wait() enable_block_in_publish() // the first partial update load diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy index 2cb99b87f8693d..6dab245970a032 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -88,6 +88,20 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { } }) + def enable_publish_spin_wait = { + if (isCloudMode()) { + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + def enable_block_in_publish = { if (isCloudMode()) { GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") @@ -109,6 +123,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() // block the partial update in publish phase + enable_publish_spin_wait() enable_block_in_publish() def t1 = Thread.start { sql "set enable_unique_key_partial_update=true;" From 23a1629b7963cda9bdb01452d71bf30ca39904bd Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 31 Jul 2024 08:54:07 +0800 Subject: [PATCH 8/9] add debug point for cloud --- .../transaction/CloudGlobalTransactionMgr.java | 17 +++++++++++++---- ...update_compaction_with_higher_version.groovy | 4 +++- ...rtial_update_conflict_skip_compaction.groovy | 2 ++ .../test_partial_update_skip_compaction.groovy | 2 ++ 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 9323f8ef6c86ed..b153fd006c4b99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -715,16 +715,25 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo LOG.info("error ", e); } } - if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")) { - LOG.info("debug point: block at CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block"); + if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")) { + LOG.info("debug point: block at CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait"); + DebugPoint debugPoint = DebugPointUtil.getDebugPoint( + "CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait"); + String token = debugPoint.param("token", "invalid_token"); while (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")) { + DebugPoint blockDebugPoint = DebugPointUtil.getDebugPoint( + "CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block"); + String passToken = blockDebugPoint.param("pass_token", ""); + if (token.equals(passToken)) { + break; + } try { - Thread.sleep(200); + Thread.sleep(100); } catch (InterruptedException e) { LOG.info("error ", e); } } - LOG.info("debug point: leave CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock"); + LOG.info("debug point: leave CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait"); } StopWatch stopWatch = new StopWatch(); stopWatch.start(); diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy index 0d875d68ea37ac..c0c5fea6d2f054 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy @@ -91,6 +91,7 @@ suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") { def enable_publish_spin_wait = { tokenName -> if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait", [token: "${tokenName}"]) } else { GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait", [token: "${tokenName}"]) } @@ -98,6 +99,7 @@ suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") { def disable_publish_spin_wait = { if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") } else { GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") } @@ -105,7 +107,7 @@ suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") { def enable_block_in_publish = { passToken -> if (isCloudMode()) { - GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block", [pass_token: "${passToken}"]) } else { GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block", [pass_token: "${passToken}"]) } diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy index 9316ded81a712e..08eba337af3327 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy @@ -90,6 +90,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { def enable_publish_spin_wait = { if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") } else { GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") } @@ -97,6 +98,7 @@ suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { def disable_publish_spin_wait = { if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") } else { GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") } diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy index 6dab245970a032..d816c30f7e9bd8 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -90,6 +90,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { def enable_publish_spin_wait = { if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") } else { GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") } @@ -97,6 +98,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") { def disable_publish_spin_wait = { if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") } else { GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") } From f769d9a410d51089318f0c4e276c8d657ff2c06c Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 31 Jul 2024 11:32:54 +0800 Subject: [PATCH 9/9] fix case --- be/src/olap/base_tablet.cpp | 2 -- .../test_partial_update_compaction_with_higher_version.groovy | 1 - .../test_partial_update_publish_conflict_with_error.groovy | 4 ++++ 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 211aeee3bd52b6..141e302af8c420 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1252,8 +1252,6 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf if (is_partial_update) { int64_t max_version_in_flush_phase = txn_info->partial_update_info->max_version_in_flush_phase; - LOG_INFO("BaseTablet::update_delete_bitmap, max_version_in_flush_phase={}", - max_version_in_flush_phase); DCHECK(max_version_in_flush_phase != -1); std::vector remained_rowsets; for (const auto& rowset : specified_rowsets) { diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy index c0c5fea6d2f054..7af53662dd2d21 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy @@ -21,7 +21,6 @@ import org.awaitility.Awaitility suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") { - // DOING(baohan) def table1 = "test_partial_update_compaction_with_higher_version" sql "DROP TABLE IF EXISTS ${table1} FORCE;" sql """ CREATE TABLE IF NOT EXISTS ${table1} ( diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy index 9e61dd4eb0de9b..a3e18194318747 100644 --- a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy @@ -35,6 +35,10 @@ import org.apache.http.client.methods.CloseableHttpResponse import org.apache.http.util.EntityUtils suite("test_partial_update_publish_conflict_with_error", "nonConcurrent") { + if (isCloudMode()) { + return + } + def dbName = context.config.getDbNameByFile(context.file) def tableName = "test_partial_update_publish_conflict_with_error"