Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 80 additions & 57 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
#include "olap/wrapper_field.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/trace.h"
#include "vec/aggregate_functions/aggregate_function.h"
Expand Down Expand Up @@ -711,6 +712,13 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
<< " res=" << res;
return res;
}
new_tablet->set_alter_failed(false);
Defer defer([&new_tablet] {
// if tablet state is not TABLET_RUNNING when return, indicates that alter has failed.
if (new_tablet->tablet_state() != TABLET_RUNNING) {
new_tablet->set_alter_failed(true);
}
});

LOG(INFO) << "finish to validate alter tablet request. begin to convert data from base tablet "
"to new tablet"
Expand Down Expand Up @@ -919,7 +927,8 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.insert(new_tablet->tablet_id());
}
res = _convert_historical_rowsets(sc_params);
int64_t real_alter_version = 0;
res = _convert_historical_rowsets(sc_params, &real_alter_version);
{
std::lock_guard<std::shared_mutex> wrlock(_mutex);
_tablet_ids_in_converting.erase(new_tablet->tablet_id());
Expand All @@ -928,65 +937,12 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
break;
}

// For unique with merge-on-write table, should process delete bitmap here.
// 1. During double write, the newly imported rowsets does not calculate
// delete bitmap and publish successfully.
// 2. After conversion, calculate delete bitmap for the rowsets imported
// during double write. During this period, new data can still be imported
// witout calculating delete bitmap and publish successfully.
// 3. Block the new publish, calculate the delete bitmap of the
// incremental rowsets.
// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
// data will calculate delete bitmap.
if (new_tablet->keys_type() == UNIQUE_KEYS &&
new_tablet->enable_unique_key_merge_on_write()) {
// step 2
int64_t max_version = new_tablet->max_version().second;
std::vector<RowsetSharedPtr> rowsets;
if (end_version < max_version) {
LOG(INFO)
<< "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "double write rowsets for version: " << end_version + 1 << "-"
<< max_version;
RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
{end_version + 1, max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
if (rowset_ptr->version().second <= end_version) {
continue;
}
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::shared_lock<std::shared_mutex> wrlock(new_tablet->get_header_lock());
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}

// step 3
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
int64_t new_max_version = new_tablet->max_version_unlocked().second;
rowsets.clear();
if (max_version < new_max_version) {
LOG(INFO)
<< "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 << "-"
<< new_max_version;
RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets(
{max_version + 1, new_max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
if (rowset_ptr->version().second <= max_version) {
continue;
}
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}

// step 4
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
res = _calc_delete_bitmap_for_mow_table(new_tablet, real_alter_version);
if (!res) {
break;
}
new_tablet->save_meta();
} else {
// set state to ready
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
Expand Down Expand Up @@ -1036,7 +992,10 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
return Status::OK();
}

Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params) {
// The `real_alter_version` parameter indicates that the version of [0-real_alter_version] is
// converted from a base tablet, only used for the mow table now.
Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version) {
LOG(INFO) << "begin to convert historical rowsets for new_tablet from base_tablet."
<< " base_tablet=" << sc_params.base_tablet->full_name()
<< ", new_tablet=" << sc_params.new_tablet->full_name();
Expand Down Expand Up @@ -1148,7 +1107,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< "tablet=" << sc_params.new_tablet->full_name() << ", version='"
<< rs_reader->version().first << "-" << rs_reader->version().second;
StorageEngine::instance()->add_unused_rowset(new_rowset);
res = Status::OK();
return process_alter_exit();
} else if (!res) {
LOG(WARNING) << "failed to register new version. "
<< " tablet=" << sc_params.new_tablet->full_name()
Expand All @@ -1161,6 +1120,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
<< ", version=" << rs_reader->version().first << "-"
<< rs_reader->version().second;
}
*real_alter_version = rs_reader->version().second;

VLOG_TRACE << "succeed to convert a history version."
<< " version=" << rs_reader->version().first << "-"
Expand Down Expand Up @@ -1384,4 +1344,67 @@ Status SchemaChangeHandler::_validate_alter_result(TabletSharedPtr new_tablet,
return Status::OK();
}

// For unique with merge-on-write table, should process delete bitmap here.
// 1. During double write, the newly imported rowsets does not calculate
// delete bitmap and publish successfully.
// 2. After conversion, calculate delete bitmap for the rowsets imported
// during double write. During this period, new data can still be imported
// witout calculating delete bitmap and publish successfully.
// 3. Block the new publish, calculate the delete bitmap of the
// incremental rowsets.
// 4. Switch the tablet status to TABLET_RUNNING. The newly imported
// data will calculate delete bitmap.
Status SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
int64_t alter_version) {
DBUG_EXECUTE_IF("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
LOG_WARNING("SchemaChangeHandler._calc_delete_bitmap_for_mow_table.random_failed");
return Status::InternalError("debug schema change calc delete bitmap random failed");
}
});

// can't do compaction when calc delete bitmap, if the rowset being calculated does
// a compaction, it may cause the delete bitmap to be missed.
std::lock_guard base_compaction_lock(new_tablet->get_base_compaction_lock());
std::lock_guard cumu_compaction_lock(new_tablet->get_cumulative_compaction_lock());

// step 2
int64_t max_version = new_tablet->max_version().second;
std::vector<RowsetSharedPtr> rowsets;
if (alter_version < max_version) {
LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "double write rowsets for version: " << alter_version + 1 << "-" << max_version
<< " new_tablet=" << new_tablet->tablet_id();
std::shared_lock<std::shared_mutex> rlock(new_tablet->get_header_lock());
RETURN_IF_ERROR(
new_tablet->capture_consistent_rowsets({alter_version + 1, max_version}, &rowsets));
}
for (auto rowset_ptr : rowsets) {
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::shared_lock<std::shared_mutex> rlock(new_tablet->get_header_lock());
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}

// step 3
std::lock_guard<std::mutex> rwlock(new_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
int64_t new_max_version = new_tablet->max_version_unlocked().second;
rowsets.clear();
if (max_version < new_max_version) {
LOG(INFO) << "alter table for unique with merge-on-write, calculate delete bitmap of "
<< "incremental rowsets for version: " << max_version + 1 << "-"
<< new_max_version << " new_tablet=" << new_tablet->tablet_id();
RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version + 1, new_max_version},
&rowsets));
}
for (auto rowset_ptr : rowsets) {
RETURN_IF_ERROR(new_tablet->update_delete_bitmap_without_lock(rowset_ptr));
}
// step 4
RETURN_IF_ERROR(new_tablet->set_tablet_state(TabletState::TABLET_RUNNING));
new_tablet->save_meta();
return Status::OK();
}

} // namespace doris
6 changes: 5 additions & 1 deletion be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ class SchemaChangeHandler {
static Status _validate_alter_result(TabletSharedPtr new_tablet,
const TAlterTabletReqV2& request);

static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);
static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
int64_t* real_alter_version);

static Status _parse_request(const SchemaChangeParams& sc_params, BlockChanger* changer,
bool* sc_sorting, bool* sc_directly);
Expand All @@ -281,6 +282,9 @@ class SchemaChangeHandler {
static Status _init_column_mapping(ColumnMapping* column_mapping,
const TabletColumn& column_schema, const std::string& value);

static Status _calc_delete_bitmap_for_mow_table(TabletSharedPtr new_tablet,
int64_t alter_version);

static std::shared_mutex _mutex;
static std::unordered_set<int64_t> _tablet_ids_in_converting;
static std::set<std::string> _supported_functions;
Expand Down
23 changes: 15 additions & 8 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
#include "segment_loader.h"
#include "service/point_query_executor.h"
#include "util/bvar_helper.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/pretty_printer.h"
Expand Down Expand Up @@ -1013,14 +1014,6 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
return false;
}

// unique key table with merge-on-write also cann't do cumulative compaction under alter
// process. It may cause the delete bitmap calculation error, such as two
// rowsets have same key.
if (tablet_state() != TABLET_RUNNING && keys_type() == UNIQUE_KEYS &&
enable_unique_key_merge_on_write()) {
return false;
}

if (data_dir()->path_hash() != path_hash || !is_used() || !init_succeeded()) {
return false;
}
Expand Down Expand Up @@ -1746,6 +1739,13 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info,
}
}

// There are two cases when tablet state is TABLET_NOTREADY
// case 1: tablet is doing schema change. Fe knows it's state, doing nothing.
// case 2: tablet has finished schema change, but failed. Fe will perform recovery.
if (tablet_state() == TABLET_NOTREADY && is_alter_failed()) {
tablet_info->__set_used(false);
}

if (tablet_state() == TABLET_SHUTDOWN) {
tablet_info->__set_used(false);
}
Expand Down Expand Up @@ -3297,6 +3297,13 @@ void Tablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur,

// The caller should hold _rowset_update_lock and _meta_lock lock.
Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) {
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
return Status::InternalError(
"debug tablet update delete bitmap without lock random failed");
}
});
int64_t cur_version = rowset->end_version();
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ class Tablet : public BaseTablet {
Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, int64_t max_version,
int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids,
std::vector<RowsetSharedPtr>* rowsets = nullptr);
void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
bool is_alter_failed() { return _alter_failed; }

private:
Status _init_once_action();
Expand Down Expand Up @@ -704,6 +706,8 @@ class Tablet : public BaseTablet {
// may delete compaction input rowsets.
std::mutex _cold_compaction_lock;
int64_t _last_failed_follow_cooldown_time = 0;
// `_alter_failed` is used to indicate whether the tablet failed to perform a schema change
std::atomic<bool> _alter_failed = false;

DISALLOW_COPY_AND_ASSIGN(Tablet);

Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,19 @@ Status EngineCloneTask::_do_clone() {
StorageEngine::instance()->tablet_manager()->get_tablet(_clone_req.tablet_id);

// The status of a tablet is not ready, indicating that it is a residual tablet after a schema
// change failure. It should not provide normal read and write, so drop it here.
// change failure. Clone a new tablet from remote be to overwrite it. This situation basically only
// occurs when the be_rebalancer_fuzzy_test configuration is enabled.
if (tablet && tablet->tablet_state() == TABLET_NOTREADY) {
LOG(WARNING) << "tablet state is not ready when clone, need to drop old tablet, tablet_id="
<< tablet->tablet_id();
// can not drop tablet when under clone. so unregister clone tablet firstly.
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
RETURN_IF_ERROR(StorageEngine::instance()->tablet_manager()->drop_tablet(
tablet->tablet_id(), tablet->replica_id(), false));
if (!StorageEngine::instance()->tablet_manager()->register_clone_tablet(
_clone_req.tablet_id)) {
return Status::InternalError("tablet {} is under clone", _clone_req.tablet_id);
}
tablet.reset();
}
bool is_new_tablet = tablet == nullptr;
Expand Down