Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ Status OlapScanner::prepare(
// the rowsets maybe compacted when the last olap scanner starts
Version rd_version(0, _version);
Status acquire_reader_st =
_tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers);
_tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers,
_runtime_state->skip_missing_version());
if (!acquire_reader_st.ok()) {
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
std::stringstream ss;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2138,7 +2138,7 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
*max_rowset = rowset;

RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, rowset->version().second),
versions_to_be_changed));
versions_to_be_changed, false, false));

return Status::OK();
}
Expand Down
19 changes: 13 additions & 6 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void Tablet::delete_expired_stale_rowset() {
Version test_version = Version(0, lastest_delta->end_version());
stale_version_path_map[*path_id_iter] = version_path;

Status status = capture_consistent_versions(test_version, nullptr);
Status status = capture_consistent_versions(test_version, nullptr, false, false);
// 1. When there is no consistent versions, we must reconstruct the tracker.
if (!status.ok()) {
// 2. fetch missing version after delete
Expand Down Expand Up @@ -696,7 +696,8 @@ bool Tablet::_reconstruct_version_tracker_if_necessary() {
}

Status Tablet::capture_consistent_versions(const Version& spec_version,
std::vector<Version>* version_path, bool quiet) const {
std::vector<Version>* version_path,
bool skip_missing_version, bool quiet) const {
Status status =
_timestamped_version_tracker.capture_consistent_versions(spec_version, version_path);
if (!status.ok() && !quiet) {
Expand All @@ -715,6 +716,10 @@ Status Tablet::capture_consistent_versions(const Version& spec_version,
LOG(WARNING) << "status:" << status << ", tablet:" << full_name()
<< ", missed version for version:" << spec_version;
_print_missed_versions(missed_versions);
if (skip_missing_version) {
LOG(WARNING) << "force skipping missing version for tablet:" << full_name();
return Status::OK();
}
}
}
}
Expand All @@ -723,7 +728,7 @@ Status Tablet::capture_consistent_versions(const Version& spec_version,

Status Tablet::check_version_integrity(const Version& version, bool quiet) {
std::shared_lock rdlock(_meta_lock);
return capture_consistent_versions(version, nullptr, quiet);
return capture_consistent_versions(version, nullptr, false, quiet);
}

// If any rowset contains the specific version, it means the version already exist
Expand All @@ -747,7 +752,7 @@ void Tablet::acquire_version_and_rowsets(
Status Tablet::capture_consistent_rowsets(const Version& spec_version,
std::vector<RowsetSharedPtr>* rowsets) const {
std::vector<Version> version_path;
RETURN_NOT_OK(capture_consistent_versions(spec_version, &version_path));
RETURN_NOT_OK(capture_consistent_versions(spec_version, &version_path, false, false));
RETURN_NOT_OK(_capture_consistent_rowsets_unlocked(version_path, rowsets));
return Status::OK();
}
Expand Down Expand Up @@ -784,9 +789,11 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const std::vector<Version>&
}

Status Tablet::capture_rs_readers(const Version& spec_version,
std::vector<RowsetReaderSharedPtr>* rs_readers) const {
std::vector<RowsetReaderSharedPtr>* rs_readers,
bool skip_missing_version) const {
std::vector<Version> version_path;
RETURN_NOT_OK(capture_consistent_versions(spec_version, &version_path));
RETURN_NOT_OK(
capture_consistent_versions(spec_version, &version_path, skip_missing_version, false));
RETURN_NOT_OK(capture_rs_readers(version_path, rs_readers));
return Status::OK();
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,10 @@ class Tablet : public BaseTablet {

// Given spec_version, find a continuous version path and store it in version_path.
// If quiet is true, then only "does this path exist" is returned.
// If skip_missing_version is true, return ok even there are missing versions.
Status capture_consistent_versions(const Version& spec_version,
std::vector<Version>* version_path,
bool quiet = false) const;
bool skip_missing_version, bool quiet) const;
// if quiet is true, no error log will be printed if there are missing versions
Status check_version_integrity(const Version& version, bool quiet = false);
bool check_version_exist(const Version& version) const;
Expand All @@ -159,8 +160,10 @@ class Tablet : public BaseTablet {

Status capture_consistent_rowsets(const Version& spec_version,
std::vector<RowsetSharedPtr>* rowsets) const;
// If skip_missing_version is true, skip versions if they are missing.
Status capture_rs_readers(const Version& spec_version,
std::vector<RowsetReaderSharedPtr>* rs_readers) const;
std::vector<RowsetReaderSharedPtr>* rs_readers,
bool skip_missing_version) const;

Status capture_rs_readers(const std::vector<Version>& version_path,
std::vector<RowsetReaderSharedPtr>* rs_readers) const;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ class RuntimeState {
return _query_options.__isset.skip_delete_bitmap && _query_options.skip_delete_bitmap;
}

bool skip_missing_version() const {
return _query_options.__isset.skip_missing_version && _query_options.skip_missing_version;
}

int partitioned_hash_join_rows_threshold() const {
if (!_query_options.__isset.partitioned_hash_join_rows_threshold) {
return 0;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
// to prevent this case: when there are lots of olap scanners to run for example 10000
// the rowsets maybe compacted when the last olap scanner starts
Version rd_version(0, _version);
Status acquire_reader_st =
_tablet->capture_rs_readers(rd_version, &_tablet_reader_params.rs_readers);
Status acquire_reader_st = _tablet->capture_rs_readers(
rd_version, &_tablet_reader_params.rs_readers, _state->skip_missing_version());
if (!acquire_reader_st.ok()) {
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
std::stringstream ss;
Expand Down
4 changes: 2 additions & 2 deletions be/test/olap/tablet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,12 @@ TEST_F(TestTablet, pad_rowset) {

Version version(5, 5);
std::vector<RowsetReaderSharedPtr> readers;
ASSERT_FALSE(_tablet->capture_rs_readers(version, &readers).ok());
ASSERT_FALSE(_tablet->capture_rs_readers(version, &readers, false).ok());
readers.clear();

PadRowsetAction action;
action._pad_rowset(_tablet, version);
ASSERT_TRUE(_tablet->capture_rs_readers(version, &readers).ok());
ASSERT_TRUE(_tablet->capture_rs_readers(version, &readers, false).ok());
}

TEST_F(TestTablet, cooldown_policy) {
Expand Down
14 changes: 0 additions & 14 deletions docs/en/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1805,20 +1805,6 @@ In some very special circumstances, such as code bugs, or human misoperation, et

Set to true so that Doris will automatically use blank replicas to fill tablets which all replicas have been damaged or missing

#### `recover_with_skip_missing_version`

Default:disable

IsMutable:true

MasterOnly:true

In some scenarios, there is an unrecoverable metadata problem in the cluster, and the visibleVersion of the data does not match be. In this case, it is still necessary to restore the remaining data (which may cause problems with the correctness of the data). This configuration is the same as` recover_with_empty_tablet` should only be used in emergency situations
This configuration has three values:
* disable : If an exception occurs, an error will be reported normally.
* ignore_version: ignore the visibleVersion information recorded in fe partition, use replica version
* ignore_all: In addition to ignore_version, when encountering no queryable replica, skip it directly instead of throwing an exception

#### `min_clone_task_timeout_sec` `And max_clone_task_timeout_sec`

Default:Minimum 3 minutes, maximum two hours
Expand Down
4 changes: 4 additions & 0 deletions docs/en/docs/advanced/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,10 @@ Translated with www.DeepL.com/Translator (free version)

For debugging purpose. In Unique Key MoW table, in case of problems of reading data, setting value to `true` will also read deleted data.

* `skip_missing_version`

In some scenarios, all replicas of tablet are having missing versions, and the tablet is unable to recover. This config can control the behavior of query. When it is opened, the query will ignore the visible version recorded in FE partition, use the replica version. If the replica on be has missing versions, the query will directly skip this missing version, and only return the data of the existing version, In addition, the query will always try to select the one with the highest lastSuccessVersion among all surviving BE replicas, so as to recover as much data as possible. You should only open it in the emergency scenarios mentioned above, only used for temporary recovery queries. Note that, this variable conflicts with the a variable, when the a variable is not -1, this variable will not work.

* `default_password_lifetime`

Default password expiration time. The default value is 0, which means no expiration. The unit is days. This parameter is only enabled if the user's password expiration property has a value of DEFAULT. like:
Expand Down
18 changes: 0 additions & 18 deletions docs/zh-CN/docs/admin-manual/config/fe-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1805,24 +1805,6 @@ show data (其他用法:HELP SHOW DATA)

在这种情况下,您可以将此配置设置为 true。 系统会将损坏的 tablet 替换为空 tablet,以确保查询可以执行。 (但此时数据已经丢失,所以查询结果可能不准确)

#### `recover_with_skip_missing_version`

默认值:disable

是否可以动态配置:true

是否为 Master FE 节点独有的配置项:true

有些场景下集群出现了不可恢复的元数据问题,数据已的visibleversion 已经和be 不匹配,

这种情况下仍然需要恢复剩余的数据(可能能会导致数据的正确性有问题),这个配置同`recover_with_empty_tablet` 一样只能在紧急情况下使用

这个配置有三个值:

* disable :出现异常会正常报错。
* ignore_version: 忽略 fe partition 中记录的visibleVersion 信息, 使用replica version
* ignore_all: 除了ignore_version, 在遇到找不到可查询的replica 时,直接跳过而不是抛出异常

#### `min_clone_task_timeout_sec` 和 `max_clone_task_timeout_sec`

默认值:最小3分钟,最大两小时
Expand Down
4 changes: 4 additions & 0 deletions docs/zh-CN/docs/advanced/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,10 @@ try (Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/

用于调试目的。在Unique Key MoW表中,当发现读取表的数据结果有误的时候,把此变量的值设置为`true`,将会把被delete bitmap标记删除的数据当成正常数据读取。

* `skip_missing_version`

有些极端场景下,表的 Tablet 下的所有的所有副本都有版本缺失,使得这些 Tablet 没有办法被恢复,导致整张表都不能查询。这个变量可以用来控制查询的行为,打设置为`true`时,查询会忽略 FE partition 中记录的 visibleVersion,使用 replica version。如果 Be 上的 Replica 有缺失的版本,则查询会直接跳过这些缺失的版本,只返回仍存在版本的数据。此外,查询将会总是选择所有存活的 BE 中所有 Replica 里 lastSuccessVersion 最大的那一个,这样可以尽可能的恢复更多的数据。这个变量应该只在上述紧急情况下才被设置为`true`,仅用于临时让表恢复查询。注意,此变量与 use_fix_replica 变量冲突,当 use_fix_replica 变量不等于 -1 时,此变量会不起作用

* `default_password_lifetime`

默认的密码过期时间。默认值为 0,即表示不过期。单位为天。该参数只有当用户的密码过期属性为 DEFAULT 值时,才启用。如:
Expand Down
14 changes: 0 additions & 14 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1406,20 +1406,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean recover_with_empty_tablet = false;

/**
* In some scenarios, there is an unrecoverable metadata problem in the cluster,
* and the visibleVersion of the data does not match be. In this case, it is still
* necessary to restore the remaining data (which may cause problems with the correctness of the data).
* This configuration is the same as` recover_with_empty_tablet` should only be used in emergency situations
* This configuration has three values:
* disable : If an exception occurs, an error will be reported normally.
* ignore_version: ignore the visibleVersion information recorded in fe partition, use replica version
* ignore_all: In addition to ignore_version, when encountering no queryable replica,
* skip it directly instead of throwing an exception
*/
@ConfField(mutable = true, masterOnly = true)
public static String recover_with_skip_missing_version = "disable";

/**
* Whether to add a delete sign column when create unique table
*/
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
public class Replica implements Writable {
private static final Logger LOG = LogManager.getLogger(Replica.class);
public static final VersionComparator<Replica> VERSION_DESC_COMPARATOR = new VersionComparator<Replica>();
public static final LastSuccessVersionComparator<Replica> LAST_SUCCESS_VERSION_COMPARATOR =
new LastSuccessVersionComparator<Replica>();
public static final IdComparator<Replica> ID_COMPARATOR = new IdComparator<Replica>();

public enum ReplicaState {
Expand Down Expand Up @@ -528,6 +530,22 @@ public int compare(T replica1, T replica2) {
}
}

private static class LastSuccessVersionComparator<T extends Replica> implements Comparator<T> {
public LastSuccessVersionComparator() {
}

@Override
public int compare(T replica1, T replica2) {
if (replica1.getLastSuccessVersion() < replica2.getLastSuccessVersion()) {
return 1;
} else if (replica1.getLastSuccessVersion() == replica2.getLastSuccessVersion()) {
return 0;
} else {
return -1;
}
}
}

private static class IdComparator<T extends Replica> implements Comparator<T> {
public IdComparator() {
}
Expand Down
6 changes: 3 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,16 @@ public Multimap<Long, Long> getNormalReplicaBackendPathMap() {
}

// for query
public List<Replica> getQueryableReplicas(long visibleVersion) {
public List<Replica> getQueryableReplicas(long visibleVersion, boolean allowFailedVersion) {
List<Replica> allQueryableReplica = Lists.newArrayListWithCapacity(replicas.size());
List<Replica> auxiliaryReplica = Lists.newArrayListWithCapacity(replicas.size());
for (Replica replica : replicas) {
if (replica.isBad()) {
continue;
}

// Skip the missing version replica
if (replica.getLastFailedVersion() > 0) {
// Skip the missing version replica.
if (replica.getLastFailedVersion() > 0 && !allowFailedVersion) {
continue;
}

Expand Down
33 changes: 20 additions & 13 deletions fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -627,14 +627,19 @@ private void addScanRangeLocations(Partition partition,
String visibleVersionStr = String.valueOf(visibleVersion);

Set<Tag> allowedTags = Sets.newHashSet();
int useFixReplica = -1;
boolean needCheckTags = false;
boolean skipMissingVersion = false;
if (ConnectContext.get() != null) {
allowedTags = ConnectContext.get().getResourceTags();
needCheckTags = ConnectContext.get().isResourceTagsSet();
useFixReplica = ConnectContext.get().getSessionVariable().useFixReplica;
// if use_fix_replica is set to true, set skip_missing_version to false
skipMissingVersion = useFixReplica == -1 && ConnectContext.get().getSessionVariable().skipMissingVersion;
}
for (Tablet tablet : tablets) {
long tabletId = tablet.getId();
if (!Config.recover_with_skip_missing_version.equalsIgnoreCase("disable")) {
if (skipMissingVersion) {
long tabletVersion = -1L;
for (Replica replica : tablet.getReplicas()) {
if (replica.getVersion() > tabletVersion) {
Expand All @@ -657,7 +662,7 @@ private void addScanRangeLocations(Partition partition,
paloRange.setTabletId(tabletId);

// random shuffle List && only collect one copy
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion);
List<Replica> replicas = tablet.getQueryableReplicas(visibleVersion, skipMissingVersion);
if (replicas.isEmpty()) {
LOG.error("no queryable replica found in tablet {}. visible version {}",
tabletId, visibleVersion);
Expand All @@ -669,12 +674,13 @@ private void addScanRangeLocations(Partition partition,
throw new UserException("Failed to get scan range, no queryable replica found in tablet: " + tabletId);
}

int useFixReplica = -1;
if (ConnectContext.get() != null) {
useFixReplica = ConnectContext.get().getSessionVariable().useFixReplica;
}
if (useFixReplica == -1) {
Collections.shuffle(replicas);
if (skipMissingVersion) {
// sort by replica's last success version, higher success version in the front.
replicas.sort(Replica.LAST_SUCCESS_VERSION_COMPARATOR);
} else {
Collections.shuffle(replicas);
}
} else {
LOG.debug("use fix replica, value: {}, replica num: {}", useFixReplica, replicas.size());
// sort by replica id
Expand Down Expand Up @@ -721,14 +727,15 @@ private void addScanRangeLocations(Partition partition,
collectedStat = true;
}
scanBackendIds.add(backend.getId());
// For skipping missing version of tablet, we only select the backend with the highest last
// success version replica to save as much data as possible.
if (!tabletIsNull && skipMissingVersion) {
break;
}
}
if (tabletIsNull) {
if (Config.recover_with_skip_missing_version.equalsIgnoreCase("ignore_all")) {
continue;
} else {
throw new UserException(tabletId + " have no queryable replicas. err: "
+ Joiner.on(", ").join(errs));
}
throw new UserException(tabletId + " have no queryable replicas. err: "
+ Joiner.on(", ").join(errs));
}
TScanRange scanRange = new TScanRange();
scanRange.setPaloScanRange(paloRange);
Expand Down
Loading