Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,11 @@ bool Tablet::_reconstruct_version_tracker_if_necessary() {
}

OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
std::vector<Version>* version_path) const {
std::vector<Version>* version_path,
bool quiet) const {
OLAPStatus status =
_timestamped_version_tracker.capture_consistent_versions(spec_version, version_path);
if (status != OLAP_SUCCESS) {
if (status != OLAP_SUCCESS && !quiet) {
std::vector<Version> missed_versions;
calc_missed_versions_unlocked(spec_version.second, &missed_versions);
if (missed_versions.empty()) {
Expand All @@ -577,9 +578,9 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version,
return status;
}

OLAPStatus Tablet::check_version_integrity(const Version& version) {
OLAPStatus Tablet::check_version_integrity(const Version& version, bool quiet) {
ReadLock rdlock(&_meta_lock);
return capture_consistent_versions(version, nullptr);
return capture_consistent_versions(version, nullptr, quiet);
}

// If any rowset contains the specific version, it means the version already exist
Expand Down Expand Up @@ -1245,14 +1246,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
tablet_info->schema_hash = _tablet_meta->schema_hash();
tablet_info->row_count = _tablet_meta->num_rows();
tablet_info->data_size = _tablet_meta->tablet_footprint();
Version version = {-1, 0};
_max_continuous_version_from_beginning_unlocked(&version);

tablet_info->__set_version_miss(false);
auto max_rowset = rowset_with_max_version();
if (max_rowset != nullptr) {
if (max_rowset->version() != version) {
tablet_info->__set_version_miss(true);
}
} else {
if (max_rowset == nullptr) {
// If the tablet is in running state, it must not be doing schema-change. so if we can not
// access its rowsets, it means that the tablet is bad and needs to be reported to the FE
// for subsequent repairs (through the cloning task)
Expand All @@ -1263,8 +1260,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) {
// still sets the state to normal when reporting. Note that every task has an timeout,
// so if the task corresponding to this change hangs, when the task timeout, FE will know
// and perform state modification operations.
} else {
tablet_info->__set_version_miss(check_version_integrity({0, max_rowset->version().second}, true));
}
tablet_info->version = version.second;
tablet_info->version = max_rowset->version().second;
// Useless but it is a required filed in TTabletInfo
tablet_info->version_hash = 0;
tablet_info->__set_partition_id(_tablet_meta->partition_id());
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,13 @@ class Tablet : public BaseTablet {
/// need to delete flag.
void delete_expired_stale_rowset();

// Given spec_version, find a continuous version path and store it in version_path.
// If quiet is true, then only "does this path exist" is returned.
OLAPStatus capture_consistent_versions(const Version& spec_version,
std::vector<Version>* version_path) const;
OLAPStatus check_version_integrity(const Version& version);
std::vector<Version>* version_path,
bool quiet = false) const;
// if quiet is true, no error log will be printed if there are missing versions
OLAPStatus check_version_integrity(const Version& version, bool quiet = false);
bool check_version_exist(const Version& version) const;
void acquire_version_and_rowsets(
std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) const;
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.BackendIdsUpdateInfo;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.ClusterInfo;
import org.apache.doris.persist.ColocatePersistInfo;
Expand Down Expand Up @@ -745,12 +746,15 @@ public Checkpoint getCheckpointer() {
public StatisticsManager getStatisticsManager() {
return statisticsManager;
}

public StatisticsJobManager getStatisticsJobManager() {
return statisticsJobManager;
}

public StatisticsJobScheduler getStatisticsJobScheduler() {
return statisticsJobScheduler;
}

public StatisticsTaskScheduler getStatisticsTaskScheduler() {
return statisticsTaskScheduler;
}
Expand Down Expand Up @@ -6929,6 +6933,35 @@ public void setConfig(AdminSetConfigStmt stmt) throws DdlException {
}
}

public void replayBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) {
long backendId = backendReplicasInfo.getBackendId();
List<BackendReplicasInfo.ReplicaReportInfo> replicaInfos = backendReplicasInfo.getReplicaReportInfos();

for (BackendReplicasInfo.ReplicaReportInfo info : replicaInfos) {
Replica replica = tabletInvertedIndex.getReplica(info.tabletId, backendId);
if (replica == null) {
LOG.warn("failed to find replica of tablet {} on backend {} when replaying backend report info",
info.tabletId, backendId);
continue;
}

switch (info.type) {
case BAD:
replica.setBad(true);
break;
case MISSING_VERSION:
// The absolute value is meaningless, as long as it is greater than 0.
// This way, in other checking logic, if lastFailedVersion is found to be greater than 0,
// it will be considered a version missing replica and will be handled accordingly.
replica.setLastFailedVersion(1L);
break;
default:
break;
}
}
}

@Deprecated
public void replayBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) {
List<Pair<Long, Integer>> tabletsWithSchemaHash = backendTabletsInfo.getTabletSchemaHash();
if (!tabletsWithSchemaHash.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,8 @@ public void syncTableMetaData(EsRestClient client) {
esMetaStateTracker.run();
this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions();
} catch (Throwable e) {
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e);
LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster." +
"table id: {}, err: {}", this.name, this.id, e.getMessage());
this.esTablePartitions = null;
this.lastMetaDataSyncException = e;
}
Expand Down
60 changes: 32 additions & 28 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public boolean canQuery() {
return this == NORMAL || this == SCHEMA_CHANGE;
}
}

public enum ReplicaStatus {
OK, // health
DEAD, // backend is not available
Expand All @@ -66,7 +66,7 @@ public enum ReplicaStatus {
SCHEMA_ERROR, // replica's schema hash does not equal to index's schema hash
BAD // replica is broken.
}

@SerializedName(value = "id")
private long id;
@SerializedName(value = "backendId")
Expand Down Expand Up @@ -100,7 +100,7 @@ public enum ReplicaStatus {
@SerializedName(value = "lastSuccessVersionHash")
private long lastSuccessVersionHash = 0L;

private volatile long versionCount = -1;
private volatile long versionCount = -1;

private long pathHash = -1;

Expand All @@ -114,7 +114,7 @@ public enum ReplicaStatus {
* So this replica need a further repair.
* If we do not do this, this replica will be treated as version stale, and will be removed,
* so that the balance task is failed, which is unexpected.
*
*
* furtherRepairSetTime set alone with needFurtherRepair.
* This is an insurance, in case that further repair task always fail. If 20 min passed
* since we set needFurtherRepair to true, the 'needFurtherRepair' will be set to false.
Expand All @@ -129,13 +129,13 @@ public enum ReplicaStatus {

public Replica() {
}

// for rollup
// the new replica's version is -1 and last failed version is -1
public Replica(long replicaId, long backendId, int schemaHash, ReplicaState state) {
this(replicaId, backendId, -1, schemaHash, 0L, 0L, state, -1, -1);
}

// for create tablet and restore
public Replica(long replicaId, long backendId, ReplicaState state, long version, int schemaHash) {
this(replicaId, backendId, version, schemaHash, 0L, 0L, state, -1L, version);
Expand Down Expand Up @@ -166,7 +166,7 @@ public Replica(long replicaId, long backendId, long version, int schemaHash,
this.lastSuccessVersion = lastSuccessVersion;
}
}

public long getVersion() {
return this.version;
}
Expand All @@ -187,7 +187,7 @@ public long getId() {
public long getBackendId() {
return this.backendId;
}

public long getDataSize() {
return dataSize;
}
Expand All @@ -199,11 +199,11 @@ public long getRowCount() {
public long getLastFailedVersion() {
return lastFailedVersion;
}

public long getLastFailedTimestamp() {
return lastFailedTimestamp;
}

public long getLastSuccessVersion() {
return lastSuccessVersion;
}
Expand Down Expand Up @@ -259,7 +259,7 @@ public synchronized void updateVersionInfo(long newVersion, long newDataSize, lo
public synchronized void updateVersionWithFailedInfo(long newVersion, long lastFailedVersion, long lastSuccessVersion) {
updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, dataSize, rowCount);
}

public void updateVersionInfoForRecovery(
long newVersion,
long lastFailedVersion,
Expand All @@ -279,26 +279,26 @@ public void updateVersionInfoForRecovery(
/* last failed version: LFV
* last success version: LSV
* version: V
*
*
* Case 1:
* If LFV > LSV, set LSV back to V, which indicates that version between LSV and LFV is invalid.
* Clone task will clone the version between LSV and LFV
*
*
* Case 2:
* LFV changed, set LSV back to V. This is just same as Case 1. Cause LFV must large than LSV.
*
*
* Case 3:
* LFV remains unchanged, just update LSV, and then check if it falls into Case 1.
*
*
* Case 4:
* V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may
* happen when a clone task finished and report version V, but the LSV is already larger than V,
* And we know that version between V and LSV is valid, so move V forward to LSV.
*
*
* Case 5:
* This is a bug case, I don't know why, may be some previous version introduce it. It looks like
* the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number.
* We just reset the LFV(hash) to recovery this replica.
* We just reset the LFV(hash) to recovery this replica.
*/
private void updateReplicaInfo(long newVersion,
long lastFailedVersion, long lastSuccessVersion,
Expand Down Expand Up @@ -335,14 +335,14 @@ private void updateReplicaInfo(long newVersion,
if (this.lastSuccessVersion <= this.lastFailedVersion) {
this.lastSuccessVersion = this.version;
}

// TODO: this case is unknown, add log to observe
if (this.version > lastFailedVersion && lastFailedVersion > 0) {
LOG.debug("current version {} is larger than last failed version {}, "
+ "maybe a fatal error or be report version, print a stack here ",
this.version, lastFailedVersion, new Exception());
}

if (lastFailedVersion != this.lastFailedVersion) {
// Case 2:
if (lastFailedVersion > this.lastFailedVersion) {
Expand All @@ -360,7 +360,7 @@ private void updateReplicaInfo(long newVersion,
this.lastSuccessVersion = this.version;
}
}

// Case 4:
if (this.version >= this.lastFailedVersion) {
this.lastFailedVersion = -1;
Expand Down Expand Up @@ -406,6 +406,10 @@ public boolean checkVersionCatchUp(long expectedVersion, boolean ignoreAlter) {
return true;
}

public void setLastFailedVersion(long lastFailedVersion) {
this.lastFailedVersion = lastFailedVersion;
}

public void setState(ReplicaState replicaState) {
this.state = replicaState;
}
Expand Down Expand Up @@ -461,13 +465,13 @@ public void write(DataOutput out) throws IOException {
out.writeLong(dataSize);
out.writeLong(rowCount);
Text.writeString(out, state.name());

out.writeLong(lastFailedVersion);
out.writeLong(lastFailedVersionHash);
out.writeLong(lastSuccessVersion);
out.writeLong(lastSuccessVersionHash);
}

public void readFields(DataInput in) throws IOException {
id = in.readLong();
backendId = in.readLong();
Expand All @@ -481,13 +485,13 @@ public void readFields(DataInput in) throws IOException {
lastSuccessVersion = in.readLong();
lastSuccessVersionHash = in.readLong();
}

public static Replica read(DataInput in) throws IOException {
Replica replica = new Replica();
replica.readFields(in);
return replica;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand All @@ -496,13 +500,13 @@ public boolean equals(Object obj) {
if (!(obj instanceof Replica)) {
return false;
}

Replica replica = (Replica) obj;
return (id == replica.id)
&& (backendId == replica.backendId)
return (id == replica.id)
&& (backendId == replica.backendId)
&& (version == replica.version)
&& (dataSize == replica.dataSize)
&& (rowCount == replica.rowCount)
&& (rowCount == replica.rowCount)
&& (state.equals(replica.state))
&& (lastFailedVersion == replica.lastFailedVersion)
&& (lastSuccessVersion == replica.lastSuccessVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.doris.catalog;

import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.common.Config;
import org.apache.doris.thrift.TPartitionVersionInfo;
Expand All @@ -35,13 +32,16 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Table;
import com.google.common.collect.TreeMultimap;

import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -175,7 +175,7 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
replica.getId(), tabletId, backendId, replica,
backendTabletInfo.getVersion(),
backendTabletInfo.getSchemaHash(),
backendTabletInfo.isSetUsed() ? backendTabletInfo.isUsed() : "unknown",
backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false",
backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset");
synchronized (tabletRecoveryMap) {
tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId);
Expand Down
Loading