diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index fc20ecab35d45f..8882021f2a3313 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -63,6 +63,7 @@ #include "olap/wrapper_field.h" #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_state.h" +#include "util/debug_points.h" #include "util/defer_op.h" #include "util/trace.h" #include "vec/aggregate_functions/aggregate_function.h" @@ -711,6 +712,13 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& << " res=" << res; return res; } + new_tablet->set_alter_failed(false); + Defer defer([&new_tablet] { + // if tablet state is not TABLET_RUNNING when return, indicates that alter has failed. + if (new_tablet->tablet_state() != TABLET_RUNNING) { + new_tablet->set_alter_failed(true); + } + }); LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet " "to new tablet" @@ -919,7 +927,8 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& std::lock_guard wrlock(_mutex); _tablet_ids_in_converting.insert(new_tablet->tablet_id()); } - res = _convert_historical_rowsets(sc_params); + int64_t real_alter_version = 0; + res = _convert_historical_rowsets(sc_params, &real_alter_version); { std::lock_guard wrlock(_mutex); _tablet_ids_in_converting.erase(new_tablet->tablet_id()); @@ -928,65 +937,12 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& break; } - // For unique with merge-on-write table, should process delete bitmap here. - // 1. During double write, the newly imported rowsets does not calculate - // delete bitmap and publish successfully. - // 2. After conversion, calculate delete bitmap for the rowsets imported - // during double write. During this period, new data can still be imported - // witout calculating delete bitmap and publish successfully. - // 3. Block the new publish, calculate the delete bitmap of the - // incremental rowsets. - // 4. Switch the tablet status to TABLET_RUNNING. The newly imported - // data will calculate delete bitmap. if (new_tablet->keys_type() == UNIQUE_KEYS && new_tablet->enable_unique_key_merge_on_write()) { - // step 2 - int64_t max_version = new_tablet->max_version().second; - std::vector rowsets; - if (end_version < max_version) { - LOG(INFO) - << "alter table for unique with merge-on-write, calculate delete bitmap of " - << "double write rowsets for version: " << end_version + 1 << "-" - << max_version; - RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets( - {end_version + 1, max_version}, &rowsets)); - } - for (auto rowset_ptr : rowsets) { - if (rowset_ptr->version().second <= end_version) { - continue; - } - std::lock_guard rwlock(new_tablet->get_rowset_update_lock()); - std::shared_lock wrlock(new_tablet->get_header_lock()); - RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr)); - } - - // step 3 - std::lock_guard rwlock(new_tablet->get_rowset_update_lock()); - std::lock_guard new_wlock(new_tablet->get_header_lock()); - SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); - int64_t new_max_version = new_tablet->max_version_unlocked().second; - rowsets.clear(); - if (max_version < new_max_version) { - LOG(INFO) - << "alter table for unique with merge-on-write, calculate delete bitmap of " - << "incremental rowsets for version: " << max_version + 1 << "-" - << new_max_version; - RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets( - {max_version + 1, new_max_version}, &rowsets)); - } - for (auto rowset_ptr : rowsets) { - if (rowset_ptr->version().second <= max_version) { - continue; - } - RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr)); - } - - // step 4 - res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING); + res = _calc_delete_bitmap_for_mow_table(new_tablet, real_alter_version); if (!res) { break; } - new_tablet->save_meta(); } else { // set state to ready std::lock_guard new_wlock(new_tablet->get_header_lock()); @@ -1036,7 +992,10 @@ Status SchemaChangeHandler::_get_versions_to_be_changed( return Status::OK(); } -Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params) { +// The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is +// converted from a base tablet, only used for the mow table now. +Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params, + int64_t* real_alter_version) { LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet." << " base_tablet=" << sc_params.base_tablet->full_name() << ", new_tablet=" << sc_params.new_tablet->full_name(); @@ -1148,7 +1107,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams << "tablet=" << sc_params.new_tablet->full_name() << ", version='" << rs_reader->version().first << "-" << rs_reader->version().second; StorageEngine::instance()->add_unused_rowset(new_rowset); - res = Status::OK(); + return process_alter_exit(); } else if (!res) { LOG(WARNING) << "failed to register new version. " << " tablet=" << sc_params.new_tablet->full_name() @@ -1161,6 +1120,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams << ", version=" << rs_reader->version().first << "-" << rs_reader->version().second; } + *real_alter_version = rs_reader->version().second; VLOG_TRACE << "succeed to convert a history version." << " version=" << rs_reader->version().first << "-" @@ -1384,4 +1344,67 @@ Status SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet, return Status::OK(); } +// For unique with merge-on-write table, should process delete bitmap here. +// 1. During double write, the newly imported rowsets does not calculate +// delete bitmap and publish successfully. +// 2. After conversion, calculate delete bitmap for the rowsets imported +// during double write. During this period, new data can still be imported +// witout calculating delete bitmap and publish successfully. +// 3. Block the new publish, calculate the delete bitmap of the +// incremental rowsets. +// 4. Switch the tablet status to TABLET_RUNNING. The newly imported +// data will calculate delete bitmap. +Status SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet, + int64_t alter_version) { + DBUG_EXECUTE_IF("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed", { + if (rand() % 100 < (100 * dp->param("percent", 0.1))) { + LOG_WARNING("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed"); + return Status::InternalError("debug schema change calc delete bitmap random failed"); + } + }); + + // can't do compaction when calc delete bitmap, if the rowset being calculated does + // a compaction, it may cause the delete bitmap to be missed. + std::lock_guard base_compaction_lock(new_tablet->get_base_compaction_lock()); + std::lock_guard cumu_compaction_lock(new_tablet->get_cumulative_compaction_lock()); + + // step 2 + int64_t max_version = new_tablet->max_version().second; + std::vector rowsets; + if (alter_version < max_version) { + LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " + << "double write rowsets for version: " << alter_version + 1 << "-" << max_version + << " new_tablet=" << new_tablet->tablet_id(); + std::shared_lock rlock(new_tablet->get_header_lock()); + RETURN_IF_ERROR( + new_tablet->capture_consistent_rowsets({alter_version + 1, max_version}, &rowsets)); + } + for (auto rowset_ptr : rowsets) { + std::lock_guard rwlock(new_tablet->get_rowset_update_lock()); + std::shared_lock rlock(new_tablet->get_header_lock()); + RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr)); + } + + // step 3 + std::lock_guard rwlock(new_tablet->get_rowset_update_lock()); + std::lock_guard new_wlock(new_tablet->get_header_lock()); + SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); + int64_t new_max_version = new_tablet->max_version_unlocked().second; + rowsets.clear(); + if (max_version < new_max_version) { + LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of " + << "incremental rowsets for version: " << max_version + 1 << "-" + << new_max_version << " new_tablet=" << new_tablet->tablet_id(); + RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version + 1, new_max_version}, + &rowsets)); + } + for (auto rowset_ptr : rowsets) { + RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr)); + } + // step 4 + RETURN_IF_ERROR(new_tablet->set_tablet_state(TabletState::TABLET_RUNNING)); + new_tablet->save_meta(); + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 2c5075b9ce5845..60942f671b0ffd 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -272,7 +272,8 @@ class SchemaChangeHandler { static Status _validate_alter_result(TabletSharedPtr new_tablet, const TAlterTabletReqV2& request); - static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params); + static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params, + int64_t* real_alter_version); static Status _parse_request(const SchemaChangeParams& sc_params, BlockChanger* changer, bool* sc_sorting, bool* sc_directly); @@ -281,6 +282,9 @@ class SchemaChangeHandler { static Status _init_column_mapping(ColumnMapping* column_mapping, const TabletColumn& column_schema, const std::string& value); + static Status _calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet, + int64_t alter_version); + static std::shared_mutex _mutex; static std::unordered_set _tablet_ids_in_converting; static std::set _supported_functions; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index a7dc17ef7643bc..1699756d01e138 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -112,6 +112,7 @@ #include "segment_loader.h" #include "service/point_query_executor.h" #include "util/bvar_helper.h" +#include "util/debug_points.h" #include "util/defer_op.h" #include "util/doris_metrics.h" #include "util/pretty_printer.h" @@ -1013,14 +1014,6 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type) return false; } - // unique key table with merge-on-write also cann't do cumulative compaction under alter - // process. It may cause the delete bitmap calculation error, such as two - // rowsets have same key. - if (tablet_state() != TABLET_RUNNING && keys_type() == UNIQUE_KEYS && - enable_unique_key_merge_on_write()) { - return false; - } - if (data_dir()->path_hash() != path_hash || !is_used() || !init_succeeded()) { return false; } @@ -1746,6 +1739,13 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info, } } + // There are two cases when tablet state is TABLET_NOTREADY + // case 1: tablet is doing schema change. Fe knows it's state, doing nothing. + // case 2: tablet has finished schema change, but failed. Fe will perform recovery. + if (tablet_state() == TABLET_NOTREADY && is_alter_failed()) { + tablet_info->__set_used(false); + } + if (tablet_state() == TABLET_SHUTDOWN) { tablet_info->__set_used(false); } @@ -3297,6 +3297,13 @@ void Tablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur, // The caller should hold _rowset_update_lock and _meta_lock lock. Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) { + DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", { + if (rand() % 100 < (100 * dp->param("percent", 0.1))) { + LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed"); + return Status::InternalError( + "debug tablet update delete bitmap without lock random failed"); + } + }); int64_t cur_version = rowset->end_version(); std::vector segments; RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments)); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 2a3c2cf424859b..626f11031dc04a 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -570,6 +570,8 @@ class Tablet : public BaseTablet { Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, int64_t max_version, int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids, std::vector* rowsets = nullptr); + void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; } + bool is_alter_failed() { return _alter_failed; } private: Status _init_once_action(); @@ -704,6 +706,8 @@ class Tablet : public BaseTablet { // may delete compaction input rowsets. std::mutex _cold_compaction_lock; int64_t _last_failed_follow_cooldown_time = 0; + // `_alter_failed` is used to indicate whether the tablet failed to perform a schema change + std::atomic _alter_failed = false; DISALLOW_COPY_AND_ASSIGN(Tablet); diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 8f27cea82b9449..8a9d6de7690a95 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -165,12 +165,19 @@ Status EngineCloneTask::_do_clone() { StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id); // The status of a tablet is not ready, indicating that it is a residual tablet after a schema - // change failure. It should not provide normal read and write, so drop it here. + // change failure. Clone a new tablet from remote be to overwrite it. This situation basically only + // occurs when the be_rebalancer_fuzzy_test configuration is enabled. if (tablet && tablet->tablet_state() == TABLET_NOTREADY) { LOG(WARNING) << "tablet state is not ready when clone, need to drop old tablet, tablet_id=" << tablet->tablet_id(); + // can not drop tablet when under clone. so unregister clone tablet firstly. + StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id); RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet( tablet->tablet_id(), tablet->replica_id(), false)); + if (!StorageEngine::instance()->tablet_manager()->register_clone_tablet( + _clone_req.tablet_id)) { + return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id); + } tablet.reset(); } bool is_new_tablet = tablet == nullptr;