From 202e6e3676c4a4adcc4bc78a65bf6f060f67e0be Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 23 Feb 2022 13:23:13 +0800 Subject: [PATCH 1/6] 1 --- .../org/apache/doris/catalog/EsTable.java | 3 +- .../external/elasticsearch/EsRestClient.java | 17 +- .../java/org/apache/doris/load/LoadJob.java | 173 +++++++++++------- 3 files changed, 116 insertions(+), 77 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index 65a2f1d7b1ee3f..df19a86d96d4cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -429,7 +429,8 @@ public void syncTableMetaData(EsRestClient client) { esMetaStateTracker.run(); this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions(); } catch (Throwable e) { - LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", this.name, e); + LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster." + + "table id: {}, err: {}", this.name, this.id, e.getMessage()); this.esTablePartitions = null; this.lastMetaDataSyncException = e; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java index 7e1983a625e180..467047ac8c3588 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/elasticsearch/EsRestClient.java @@ -41,6 +41,7 @@ import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; + import okhttp3.Credentials; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -60,7 +61,7 @@ public class EsRestClient { private static OkHttpClient networkClient = new OkHttpClient.Builder() .readTimeout(10, TimeUnit.SECONDS) .build(); - + private static OkHttpClient sslNetworkClient; private Request.Builder builder; @@ -154,7 +155,7 @@ public EsShardPartitions searchShards(String indexName) throws DorisEsException } return EsShardPartitions.findShardPartitions(indexName, searchShards); } - + /** * init ssl networkClient use lazy way **/ @@ -217,7 +218,7 @@ private String execute(String path) throws DorisEsException { } selectNextNode(); } - LOG.warn("try all nodes [{}],no other nodes left", nodes); + LOG.warn("try all nodes [{}], no other nodes left", nodes); if (scratchExceptionForThrow != null) { throw scratchExceptionForThrow; } @@ -245,11 +246,15 @@ private T parseContent(String response, String key) { * support https **/ private static class TrustAllCerts implements X509TrustManager { - public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + } - public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {} + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + } - public X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];} + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } } private static class TrustAllHostnameVerifier implements HostnameVerifier { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java index 680ff975505444..59e90ddf1036b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java @@ -107,7 +107,7 @@ public enum JobState { private EtlJobType etlJobType; private EtlJobInfo etlJobInfo; - + private Map idToTableLoadInfo; private Map idToTabletLoadInfo; private Set quorumTablets; @@ -115,14 +115,14 @@ public enum JobState { private List unfinishedTablets; private Set pushTasks; private Map replicaPersistInfos; - + private Map finishedReplicas; - + private List conditions = null; private DeleteInfo deleteInfo; private TResourceInfo resourceInfo; - + private TPriority priority; private long execMemLimit; @@ -137,15 +137,15 @@ public LoadJob() { public LoadJob(String label) { this(label, DEFAULT_TIMEOUT_S, Config.default_max_filter_ratio); } - + // convert an async delete job to load job - public LoadJob(long id, long dbId, long tableId, long partitionId, String label, - Map indexIdToSchemaHash, List deleteConditions, - DeleteInfo deleteInfo) { + public LoadJob(long id, long dbId, long tableId, long partitionId, String label, + Map indexIdToSchemaHash, List deleteConditions, + DeleteInfo deleteInfo) { this.id = id; this.dbId = dbId; this.tableId = tableId; - this.label = label; + this.label = label; this.transactionId = -1; this.timestamp = -1; this.timeoutSecond = DEFAULT_TIMEOUT_S; @@ -167,8 +167,10 @@ public LoadJob(long id, long dbId, long tableId, long partitionId, String label, hadoopEtlJobInfo.setEtlOutputDir(""); this.etlJobInfo = hadoopEtlJobInfo; this.etlJobInfo.setJobStatus(etlStatus); - this.idToTableLoadInfo = Maps.newHashMap();; - this.idToTabletLoadInfo = Maps.newHashMap();; + this.idToTableLoadInfo = Maps.newHashMap(); + ; + this.idToTabletLoadInfo = Maps.newHashMap(); + ; this.quorumTablets = new HashSet(); this.fullTablets = new HashSet(); this.unfinishedTablets = new ArrayList<>(); @@ -178,7 +180,7 @@ public LoadJob(long id, long dbId, long tableId, long partitionId, String label, this.priority = TPriority.NORMAL; this.execMemLimit = DEFAULT_EXEC_MEM_LIMIT; this.finishedReplicas = Maps.newHashMap(); - + // generate table load info PartitionLoadInfo partitionLoadInfo = new PartitionLoadInfo(null); Map idToPartitionLoadInfo = new HashMap<>(); @@ -186,7 +188,7 @@ public LoadJob(long id, long dbId, long tableId, long partitionId, String label, TableLoadInfo tableLoadInfo = new TableLoadInfo(idToPartitionLoadInfo); tableLoadInfo.addAllSchemaHash(indexIdToSchemaHash); idToTableLoadInfo.put(tableId, tableLoadInfo); - + // add delete conditions to load job this.conditions = deleteConditions; this.deleteInfo = deleteInfo; @@ -223,11 +225,11 @@ public LoadJob(String label, int timeoutSecond, double maxFilterRatio) { this.execMemLimit = DEFAULT_EXEC_MEM_LIMIT; this.finishedReplicas = Maps.newHashMap(); } - + public void addTableName(String tableName) { tableNames.add(tableName); } - + public Set getTableNames() { return tableNames; } @@ -253,11 +255,11 @@ public long getTableId() { public void setDbId(long dbId) { this.dbId = dbId; } - + public long getTransactionId() { return transactionId; } - + public void setTransactionId(long transactionId) { this.transactionId = transactionId; } @@ -273,15 +275,15 @@ public void setTimestamp(long timestamp) { public long getTimestamp() { return timestamp; } - + public void setTimeoutSecond(int timeoutSecond) { this.timeoutSecond = timeoutSecond; } - + public int getTimeoutSecond() { return timeoutSecond; } - + public void setMaxFilterRatio(double maxFilterRatio) { this.maxFilterRatio = maxFilterRatio; } @@ -309,7 +311,7 @@ public void setProgress(int progress) { public long getCreateTimeMs() { return createTimeMs; } - + public void setCreateTimeMs(long createTimeMs) { this.createTimeMs = createTimeMs; } @@ -370,7 +372,7 @@ public void setLoadFinishTimeMs(long loadFinishTimeMs) { break; } } - + public long getQuorumFinishTimeMs() { return quorumFinishTimeMs; } @@ -386,7 +388,7 @@ public FailMsg getFailMsg() { public void setFailMsg(FailMsg failMsg) { this.failMsg = failMsg; } - + public EtlJobType getEtlJobType() { return etlJobType; } @@ -407,9 +409,13 @@ public BrokerFileGroupAggInfo getPullLoadSourceInfo() { return pullLoadSourceInfo; } - public void setExecMemLimit(long execMemLimit) { this.execMemLimit = execMemLimit; } + public void setExecMemLimit(long execMemLimit) { + this.execMemLimit = execMemLimit; + } - public long getExecMemLimit() { return execMemLimit; } + public long getExecMemLimit() { + return execMemLimit; + } public void setEtlJobType(EtlJobType etlJobType) { this.etlJobType = etlJobType; @@ -484,17 +490,18 @@ public String getHadoopEtlJobId() { public void setHadoopEtlJobId(String etlJobId) { if (etlJobType == EtlJobType.HADOOP) { - ((HadoopEtlJobInfo) etlJobInfo).setEtlJobId(etlJobId);; + ((HadoopEtlJobInfo) etlJobInfo).setEtlJobId(etlJobId); + ; } } - + public Map getMiniEtlTasks() { if (etlJobType == EtlJobType.MINI) { return ((MiniEtlJobInfo) etlJobInfo).getEtlTasks(); } return null; } - + public MiniEtlTaskInfo getMiniEtlTask(long taskId) { if (etlJobType == EtlJobType.MINI) { return ((MiniEtlJobInfo) etlJobInfo).getEtlTask(taskId); @@ -507,7 +514,7 @@ public void setMiniEtlTasks(Map idToEtlTask) { ((MiniEtlJobInfo) etlJobInfo).setEtlTasks(idToEtlTask); } } - + public boolean miniNeedGetTaskStatus() { if (etlJobType == EtlJobType.MINI) { return ((MiniEtlJobInfo) etlJobInfo).needGetTaskStatus(); @@ -518,15 +525,15 @@ public boolean miniNeedGetTaskStatus() { public EtlStatus getEtlJobStatus() { return etlJobInfo.getJobStatus(); } - + public void setEtlJobStatus(EtlStatus etlStatus) { etlJobInfo.setJobStatus(etlStatus); } - + public Map getIdToTableLoadInfo() { return idToTableLoadInfo; } - + public TableLoadInfo getTableLoadInfo(long tableId) { return idToTableLoadInfo.get(tableId); } @@ -555,7 +562,7 @@ public List getAllTableIds() { public Map getIdToTabletLoadInfo() { return idToTabletLoadInfo; } - + public TabletLoadInfo getTabletLoadInfo(long tabletId) { return idToTabletLoadInfo.get(tabletId); } @@ -563,19 +570,19 @@ public TabletLoadInfo getTabletLoadInfo(long tabletId) { public void setIdToTabletLoadInfo(Map idTotabletLoadInfo) { this.idToTabletLoadInfo = idTotabletLoadInfo; } - + public void addQuorumTablet(long tabletId) { quorumTablets.add(tabletId); } - + public Set getQuorumTablets() { return quorumTablets; } - + public void clearQuorumTablets() { quorumTablets.clear(); } - + public void addFullTablet(long tabletId) { fullTablets.add(tabletId); } @@ -583,24 +590,24 @@ public void addFullTablet(long tabletId) { public Set getFullTablets() { return fullTablets; } - + public void setUnfinishedTablets(Set unfinishedTablets) { this.unfinishedTablets.clear(); this.unfinishedTablets.addAll(unfinishedTablets); } - + public void addPushTask(PushTask pushTask) { pushTasks.add(pushTask); } - + public Set getPushTasks() { return pushTasks; } - + public Map getReplicaPersistInfos() { return this.replicaPersistInfos; } - + public void addReplicaPersistInfos(ReplicaPersistInfo info) { if (!replicaPersistInfos.containsKey(info.getReplicaId())) { replicaPersistInfos.put(info.getReplicaId(), info); @@ -614,16 +621,16 @@ public void setResourceInfo(TResourceInfo resourceInfo) { public TResourceInfo getResourceInfo() { return resourceInfo; } - + public boolean addFinishedReplica(Replica replica) { finishedReplicas.put(replica.getId(), replica); return true; } - + public boolean isReplicaFinished(long replicaId) { return finishedReplicas.containsKey(replicaId); } - + public Collection getFinishedReplicas() { return finishedReplicas.values(); } @@ -631,24 +638,24 @@ public Collection getFinishedReplicas() { public List getConditions() { return conditions; } - + public boolean isSyncDeleteJob() { if (conditions != null) { return true; } return false; } - + public DeleteInfo getDeleteInfo() { return deleteInfo; } - + public long getDeleteJobTimeout() { // timeout is between 30 seconds to 5 min long timeout = Math.max(idToTabletLoadInfo.size() * Config.tablet_delete_timeout_second * 1000L, 30000L); return Math.min(timeout, Config.load_straggler_wait_second * 1000L); } - + @Override public String toString() { return "LoadJob [id=" + id + ", dbId=" + dbId + ", label=" + label + ", timeoutSecond=" + timeoutSecond @@ -656,9 +663,9 @@ public String toString() { + ", progress=" + progress + ", createTimeMs=" + createTimeMs + ", etlStartTimeMs=" + etlStartTimeMs + ", etlFinishTimeMs=" + etlFinishTimeMs + ", loadStartTimeMs=" + loadStartTimeMs + ", loadFinishTimeMs=" + loadFinishTimeMs + ", failMsg=" + failMsg + ", etlJobType=" + etlJobType - + ", etlJobInfo=" + etlJobInfo + ", priority=" + priority + ", transactionId=" + transactionId - + ", quorumFinishTimeMs=" + quorumFinishTimeMs - + ", unfinished tablets=[" + this.unfinishedTablets.subList(0, Math.min(3, this.unfinishedTablets.size())) + "]" + + ", etlJobInfo=" + etlJobInfo + ", priority=" + priority + ", transactionId=" + transactionId + + ", quorumFinishTimeMs=" + quorumFinishTimeMs + + ", unfinished tablets=[" + this.unfinishedTablets.subList(0, Math.min(3, this.unfinishedTablets.size())) + "]" + "]"; } @@ -668,22 +675,22 @@ public void clearRedundantInfoForHistoryJob() { idToTableLoadInfo.clear(); idToTableLoadInfo = null; } - + if (idToTabletLoadInfo != null) { idToTabletLoadInfo.clear(); idToTabletLoadInfo = null; } - + if (quorumTablets != null) { quorumTablets.clear(); quorumTablets = null; } - + if (fullTablets != null) { fullTablets.clear(); fullTablets = null; } - + if (replicaPersistInfos != null) { replicaPersistInfos.clear(); replicaPersistInfos = null; @@ -717,7 +724,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(createTimeMs); out.writeLong(etlStartTimeMs); out.writeLong(etlFinishTimeMs); - out.writeLong(loadStartTimeMs); + out.writeLong(loadStartTimeMs); out.writeLong(loadFinishTimeMs); failMsg.write(out); Text.writeString(out, etlJobType.name()); @@ -735,7 +742,7 @@ public void write(DataOutput out) throws IOException { entry.getValue().write(out); } } - + if (idToTabletLoadInfo == null) { out.writeBoolean(false); } else { @@ -747,7 +754,7 @@ public void write(DataOutput out) throws IOException { entry.getValue().write(out); } } - + if (fullTablets == null) { out.writeBoolean(false); } else { @@ -758,7 +765,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(id); } } - + if (replicaPersistInfos == null) { out.writeBoolean(false); } else { @@ -799,7 +806,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(execMemLimit); out.writeLong(transactionId); - + if (conditions != null) { out.writeBoolean(true); count = conditions.size(); @@ -853,7 +860,7 @@ public void readFields(DataInput in) throws IOException { timestamp = in.readLong(); timeoutSecond = in.readInt(); maxFilterRatio = in.readDouble(); - + boolean deleteFlag = false; deleteFlag = in.readBoolean(); @@ -882,7 +889,7 @@ public void readFields(DataInput in) throws IOException { long key = in.readLong(); TableLoadInfo value = new TableLoadInfo(); value.readFields(in); - idToTableLoadInfo.put(key, value); + idToTableLoadInfo.put(key, value); } } @@ -896,7 +903,7 @@ public void readFields(DataInput in) throws IOException { idToTabletLoadInfo.put(key, tLoadInfo); } } - + if (in.readBoolean()) { count = in.readInt(); fullTablets = new HashSet(); @@ -905,7 +912,7 @@ public void readFields(DataInput in) throws IOException { fullTablets.add(id); } } - + if (in.readBoolean()) { count = in.readInt(); replicaPersistInfos = Maps.newHashMap(); @@ -939,6 +946,7 @@ public void readFields(DataInput in) throws IOException { this.pullLoadSourceInfo = BrokerFileGroupAggInfo.read(in); } +<<<<<<< HEAD this.execMemLimit = in.readLong(); this.transactionId = in.readLong(); if (in.readBoolean()) { @@ -954,6 +962,31 @@ public void readFields(DataInput in) throws IOException { predicate = new IsNullPredicate(new SlotRef(null, key), true); } else { predicate = new IsNullPredicate(new SlotRef(null, key), true); +======= + if (version >= FeMetaVersion.VERSION_45) { + this.transactionId = in.readLong(); + if (in.readBoolean()) { + count = in.readInt(); + conditions = Lists.newArrayList(); + for (int i = 0; i < count; i++) { + String key = Text.readString(in); + String opStr = Text.readString(in); + if (opStr.equalsIgnoreCase("IS")) { + String value = Text.readString(in); + IsNullPredicate predicate; + if (value.equalsIgnoreCase("NOT NULL")) { + predicate = new IsNullPredicate(new SlotRef(null, key), true); + } else { + predicate = new IsNullPredicate(new SlotRef(null, key), true); + } + conditions.add(predicate); + } else { + Operator op = Operator.valueOf(opStr); + String value = Text.readString(in); + BinaryPredicate predicate = new BinaryPredicate(op, new SlotRef(null, key), + new StringLiteral(value)); + conditions.add(predicate); +>>>>>>> 1 } conditions.add(predicate); } else { @@ -974,19 +1007,19 @@ public void readFields(DataInput in) throws IOException { tableNames.add(Text.readString(in)); } } - + @Override public boolean equals(Object obj) { if (obj == this) { return true; } - + if (!(obj instanceof LoadJob)) { return false; } - + LoadJob job = (LoadJob) obj; - + if (this.id == job.id) { return true; } @@ -996,6 +1029,6 @@ public boolean equals(Object obj) { // Return true if this job is finished for a long time public boolean isExpired(long currentTimeMs) { return (getState() == JobState.FINISHED || getState() == JobState.CANCELLED) - && (currentTimeMs - getLoadFinishTimeMs()) / 1000 > Config.label_keep_max_second; + && (currentTimeMs - getLoadFinishTimeMs()) / 1000 > Config.label_keep_max_second; } } From 013d3b5402b0072fef3dcf33d8c9054f9cebefe4 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 23 Feb 2022 13:48:06 +0800 Subject: [PATCH 2/6] 2 --- be/src/olap/tablet.cpp | 24 ++++++++++++------------ be/src/olap/tablet.h | 8 ++++++-- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index f9856d72c4ae89..861662c65715ce 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -552,10 +552,11 @@ bool Tablet::_reconstruct_version_tracker_if_necessary() { } OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version, - std::vector* version_path) const { + std::vector* version_path, + bool quite) const { OLAPStatus status = _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); - if (status != OLAP_SUCCESS) { + if (status != OLAP_SUCCESS && !quite) { std::vector missed_versions; calc_missed_versions_unlocked(spec_version.second, &missed_versions); if (missed_versions.empty()) { @@ -577,9 +578,9 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version, return status; } -OLAPStatus Tablet::check_version_integrity(const Version& version) { +OLAPStatus Tablet::check_version_integrity(const Version& version, bool quite) { ReadLock rdlock(&_meta_lock); - return capture_consistent_versions(version, nullptr); + return capture_consistent_versions(version, nullptr, quite); } // If any rowset contains the specific version, it means the version already exist @@ -1245,14 +1246,11 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) { tablet_info->schema_hash = _tablet_meta->schema_hash(); tablet_info->row_count = _tablet_meta->num_rows(); tablet_info->data_size = _tablet_meta->tablet_footprint(); - Version version = {-1, 0}; - _max_continuous_version_from_beginning_unlocked(&version); + + tablet_info->__set_version_miss(false); + // _max_continuous_version_from_beginning_unlocked(&version); auto max_rowset = rowset_with_max_version(); - if (max_rowset != nullptr) { - if (max_rowset->version() != version) { - tablet_info->__set_version_miss(true); - } - } else { + if (max_rowset == nullptr) { // If the tablet is in running state, it must not be doing schema-change. so if we can not // access its rowsets, it means that the tablet is bad and needs to be reported to the FE // for subsequent repairs (through the cloning task) @@ -1263,8 +1261,10 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) { // still sets the state to normal when reporting. Note that every task has an timeout, // so if the task corresponding to this change hangs, when the task timeout, FE will know // and perform state modification operations. + } else { + tablet_info->__set_version_miss(check_version_integrity({0, max_rowset->version().second}, true)); } - tablet_info->version = version.second; + tablet_info->version = max_rowset->version().second; // Useless but it is a required filed in TTabletInfo tablet_info->version_hash = 0; tablet_info->__set_partition_id(_tablet_meta->partition_id()); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 04291363a3fff6..c282c315a517af 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -115,9 +115,13 @@ class Tablet : public BaseTablet { /// need to delete flag. void delete_expired_stale_rowset(); + // Given spec_version, find a continuous version path and store it in version_path. + // If quite is true, then only "does this path exist" is returned. OLAPStatus capture_consistent_versions(const Version& spec_version, - std::vector* version_path) const; - OLAPStatus check_version_integrity(const Version& version); + std::vector* version_path, + bool quite = false) const; + // if quite is true, no error log will be printed if there are missing versions + OLAPStatus check_version_integrity(const Version& version, bool quite = false); bool check_version_exist(const Version& version) const; void acquire_version_and_rowsets( std::vector>* version_rowsets) const; From f2564f2baba9bec6c4e75475ee9bdf61684d55b7 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 23 Feb 2022 17:14:28 +0800 Subject: [PATCH 3/6] 2 --- .../org/apache/doris/catalog/Catalog.java | 33 +++++++ .../org/apache/doris/catalog/Replica.java | 56 +++++------ .../doris/catalog/TabletInvertedIndex.java | 10 +- .../apache/doris/journal/JournalEntity.java | 6 ++ .../apache/doris/master/ReportHandler.java | 8 +- .../doris/persist/BackendReplicasInfo.java | 99 +++++++++++++++++++ .../doris/persist/BackendTabletsInfo.java | 2 + .../org/apache/doris/persist/EditLog.java | 9 ++ .../apache/doris/persist/OperationType.java | 2 + 9 files changed, 187 insertions(+), 38 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 19552ad0c416a4..0b42d3f8bb524d 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -188,6 +188,7 @@ import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.BackendIdsUpdateInfo; +import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; import org.apache.doris.persist.ClusterInfo; import org.apache.doris.persist.ColocatePersistInfo; @@ -745,12 +746,15 @@ public Checkpoint getCheckpointer() { public StatisticsManager getStatisticsManager() { return statisticsManager; } + public StatisticsJobManager getStatisticsJobManager() { return statisticsJobManager; } + public StatisticsJobScheduler getStatisticsJobScheduler() { return statisticsJobScheduler; } + public StatisticsTaskScheduler getStatisticsTaskScheduler() { return statisticsTaskScheduler; } @@ -6929,6 +6933,35 @@ public void setConfig(AdminSetConfigStmt stmt) throws DdlException { } } + public void replayBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) { + long backendId = backendReplicasInfo.getBackendId(); + List replicaInfos = backendReplicasInfo.getReplicaReportInfos(); + + for (BackendReplicasInfo.ReplicaReportInfo info : replicaInfos) { + Replica replica = tabletInvertedIndex.getReplica(info.tabletId, backendId); + if (replica == null) { + LOG.warn("failed to find replica of tablet {} on backend {} when replaying backend report info", + info.tabletId, backendId); + continue; + } + + switch (info.type) { + case BAD: + replica.setBad(true); + break; + case MISSING_VERSION: + // The absolute value is meaningless, as long as it is greater than 0. + // This way, in other checking logic, if lastFailedVersion is found to be greater than 0, + // it will be considered a version missing replica and will be handled accordingly. + replica.setLastFailedVersion(1L); + break; + default: + break; + } + } + } + + @Deprecated public void replayBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) { List> tabletsWithSchemaHash = backendTabletsInfo.getTabletSchemaHash(); if (!tabletsWithSchemaHash.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index b6c38a4ac6ad33..0198955c5a1367 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -57,7 +57,7 @@ public boolean canQuery() { return this == NORMAL || this == SCHEMA_CHANGE; } } - + public enum ReplicaStatus { OK, // health DEAD, // backend is not available @@ -66,7 +66,7 @@ public enum ReplicaStatus { SCHEMA_ERROR, // replica's schema hash does not equal to index's schema hash BAD // replica is broken. } - + @SerializedName(value = "id") private long id; @SerializedName(value = "backendId") @@ -100,7 +100,7 @@ public enum ReplicaStatus { @SerializedName(value = "lastSuccessVersionHash") private long lastSuccessVersionHash = 0L; - private volatile long versionCount = -1; + private volatile long versionCount = -1; private long pathHash = -1; @@ -114,7 +114,7 @@ public enum ReplicaStatus { * So this replica need a further repair. * If we do not do this, this replica will be treated as version stale, and will be removed, * so that the balance task is failed, which is unexpected. - * + * * furtherRepairSetTime set alone with needFurtherRepair. * This is an insurance, in case that further repair task always fail. If 20 min passed * since we set needFurtherRepair to true, the 'needFurtherRepair' will be set to false. @@ -129,13 +129,13 @@ public enum ReplicaStatus { public Replica() { } - + // for rollup // the new replica's version is -1 and last failed version is -1 public Replica(long replicaId, long backendId, int schemaHash, ReplicaState state) { this(replicaId, backendId, -1, schemaHash, 0L, 0L, state, -1, -1); } - + // for create tablet and restore public Replica(long replicaId, long backendId, ReplicaState state, long version, int schemaHash) { this(replicaId, backendId, version, schemaHash, 0L, 0L, state, -1L, version); @@ -166,7 +166,7 @@ public Replica(long replicaId, long backendId, long version, int schemaHash, this.lastSuccessVersion = lastSuccessVersion; } } - + public long getVersion() { return this.version; } @@ -187,7 +187,7 @@ public long getId() { public long getBackendId() { return this.backendId; } - + public long getDataSize() { return dataSize; } @@ -199,11 +199,11 @@ public long getRowCount() { public long getLastFailedVersion() { return lastFailedVersion; } - + public long getLastFailedTimestamp() { return lastFailedTimestamp; } - + public long getLastSuccessVersion() { return lastSuccessVersion; } @@ -259,7 +259,7 @@ public synchronized void updateVersionInfo(long newVersion, long newDataSize, lo public synchronized void updateVersionWithFailedInfo(long newVersion, long lastFailedVersion, long lastSuccessVersion) { updateReplicaInfo(newVersion, lastFailedVersion, lastSuccessVersion, dataSize, rowCount); } - + public void updateVersionInfoForRecovery( long newVersion, long lastFailedVersion, @@ -279,26 +279,26 @@ public void updateVersionInfoForRecovery( /* last failed version: LFV * last success version: LSV * version: V - * + * * Case 1: * If LFV > LSV, set LSV back to V, which indicates that version between LSV and LFV is invalid. * Clone task will clone the version between LSV and LFV - * + * * Case 2: * LFV changed, set LSV back to V. This is just same as Case 1. Cause LFV must large than LSV. - * + * * Case 3: * LFV remains unchanged, just update LSV, and then check if it falls into Case 1. - * + * * Case 4: * V is larger or equal to LFV, reset LFV. And if V is less than LSV, just set V to LSV. This may * happen when a clone task finished and report version V, but the LSV is already larger than V, * And we know that version between V and LSV is valid, so move V forward to LSV. - * + * * Case 5: * This is a bug case, I don't know why, may be some previous version introduce it. It looks like * the V(hash) equals to LSV(hash), and V equals to LFV, but LFV hash is 0 or some unknown number. - * We just reset the LFV(hash) to recovery this replica. + * We just reset the LFV(hash) to recovery this replica. */ private void updateReplicaInfo(long newVersion, long lastFailedVersion, long lastSuccessVersion, @@ -335,14 +335,14 @@ private void updateReplicaInfo(long newVersion, if (this.lastSuccessVersion <= this.lastFailedVersion) { this.lastSuccessVersion = this.version; } - + // TODO: this case is unknown, add log to observe if (this.version > lastFailedVersion && lastFailedVersion > 0) { LOG.debug("current version {} is larger than last failed version {}, " + "maybe a fatal error or be report version, print a stack here ", this.version, lastFailedVersion, new Exception()); } - + if (lastFailedVersion != this.lastFailedVersion) { // Case 2: if (lastFailedVersion > this.lastFailedVersion) { @@ -360,7 +360,7 @@ private void updateReplicaInfo(long newVersion, this.lastSuccessVersion = this.version; } } - + // Case 4: if (this.version >= this.lastFailedVersion) { this.lastFailedVersion = -1; @@ -461,13 +461,13 @@ public void write(DataOutput out) throws IOException { out.writeLong(dataSize); out.writeLong(rowCount); Text.writeString(out, state.name()); - + out.writeLong(lastFailedVersion); out.writeLong(lastFailedVersionHash); out.writeLong(lastSuccessVersion); out.writeLong(lastSuccessVersionHash); } - + public void readFields(DataInput in) throws IOException { id = in.readLong(); backendId = in.readLong(); @@ -481,13 +481,13 @@ public void readFields(DataInput in) throws IOException { lastSuccessVersion = in.readLong(); lastSuccessVersionHash = in.readLong(); } - + public static Replica read(DataInput in) throws IOException { Replica replica = new Replica(); replica.readFields(in); return replica; } - + @Override public boolean equals(Object obj) { if (this == obj) { @@ -496,13 +496,13 @@ public boolean equals(Object obj) { if (!(obj instanceof Replica)) { return false; } - + Replica replica = (Replica) obj; - return (id == replica.id) - && (backendId == replica.backendId) + return (id == replica.id) + && (backendId == replica.backendId) && (version == replica.version) && (dataSize == replica.dataSize) - && (rowCount == replica.rowCount) + && (rowCount == replica.rowCount) && (state.equals(replica.state)) && (lastFailedVersion == replica.lastFailedVersion) && (lastSuccessVersion == replica.lastSuccessVersion); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 2d572685c5561d..3753ba5f836625 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -17,9 +17,6 @@ package org.apache.doris.catalog; -import com.google.common.collect.ImmutableSet; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.commons.lang3.tuple.Triple; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.common.Config; import org.apache.doris.thrift.TPartitionVersionInfo; @@ -35,13 +32,16 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; -import com.google.common.collect.Ordering; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; import com.google.common.collect.Table; import com.google.common.collect.TreeMultimap; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -175,7 +175,7 @@ public void tabletReport(long backendId, Map backendTablets, replica.getId(), tabletId, backendId, replica, backendTabletInfo.getVersion(), backendTabletInfo.getSchemaHash(), - backendTabletInfo.isSetUsed() ? backendTabletInfo.isUsed() : "unknown", + backendTabletInfo.isSetUsed() ? !backendTabletInfo.isUsed() : "false", backendTabletInfo.isSetVersionMiss() ? backendTabletInfo.isVersionMiss() : "unset"); synchronized (tabletRecoveryMap) { tabletRecoveryMap.put(tabletMeta.getDbId(), tabletId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index 801e75d6c798a4..e35011ce76949d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -51,6 +51,7 @@ import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.AlterViewInfo; import org.apache.doris.persist.BackendIdsUpdateInfo; +import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; import org.apache.doris.persist.BatchDropInfo; import org.apache.doris.persist.BatchModifyPartitionsInfo; @@ -486,6 +487,11 @@ public void readFields(DataInput in) throws IOException { isRead = true; break; } + case OperationType.OP_BACKEND_REPLICAS_INFO: { + data = BackendReplicasInfo.read(in); + isRead = true; + break; + } case OperationType.OP_CREATE_ROUTINE_LOAD_JOB: { data = RoutineLoadJob.read(in); isRead = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 02b9ab28bea579..6e374f5757f071 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -42,6 +42,7 @@ import org.apache.doris.metric.GaugeMetric; import org.apache.doris.metric.Metric.MetricUnit; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.persist.BackendReplicasInfo; import org.apache.doris.persist.BackendTabletsInfo; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.system.Backend; @@ -797,8 +798,7 @@ private static void handleRecoverTablet(ListMultimap tabletRecoveryM tabletRecoveryMap.size(), backendId); TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); - BackendTabletsInfo backendTabletsInfo = new BackendTabletsInfo(backendId); - backendTabletsInfo.setBad(true); + BackendReplicasInfo backendReplicasInfo = new BackendReplicasInfo(backendId); for (Long dbId : tabletRecoveryMap.keySet()) { Database db = Catalog.getCurrentCatalog().getDbNullable(dbId); if (db == null) { @@ -848,9 +848,7 @@ private static void handleRecoverTablet(ListMultimap tabletRecoveryM if (replica.setBad(true)) { LOG.warn("set bad for replica {} of tablet {} on backend {}", replica.getId(), tabletId, backendId); - ReplicaPersistInfo replicaPersistInfo = ReplicaPersistInfo.createForReport( - dbId, tableId, partitionId, indexId, tabletId, backendId, replica.getId()); - backendTabletsInfo.addReplicaInfo(replicaPersistInfo); + backendReplicasInfo.addBadReplica(tabletId); } break; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java new file mode 100644 index 00000000000000..00d1551a96eff6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +// This class is used to record the replica information that needs to be persisted +// and synchronized to other FEs when BE reports tablet, +// such as bad replica or missing version replica information, etc. +public class BackendReplicasInfo implements Writable { + + private long backendId; + private List replicaReportInfos = Lists.newArrayList(); + + public BackendReplicasInfo(long backendId) { + this.backendId = backendId; + } + + public void addBadReplica(long tabletId) { + replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.BAD)); + } + + public void addMissingVersionReplica(long tabletId) { + replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.MISSING_VERSION)); + } + + public long getBackendId() { + return backendId; + } + + public List getReplicaReportInfos() { + return replicaReportInfos; + } + + public static BackendReplicasInfo read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, BackendReplicasInfo.class); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public enum ReportInfoType { + BAD, + MISSING_VERSION + } + + public static class ReplicaReportInfo implements Writable { + @SerializedName(value = "tabletId") + public long tabletId; + @SerializedName(value = "type") + public ReportInfoType type; + + public ReplicaReportInfo(long tabletId, ReportInfoType type) { + this.tabletId = tabletId; + this.type = type; + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static ReplicaReportInfo read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class); + } + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendTabletsInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendTabletsInfo.java index 5182436e8f4c19..23c0a98725dd73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendTabletsInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendTabletsInfo.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.util.List; +@Deprecated +// replaced by BackendReplicaInfo public class BackendTabletsInfo implements Writable { private long backendId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 3ce8460a840fc5..eb8bf6b6c27802 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -621,6 +621,11 @@ public static void loadJournal(Catalog catalog, JournalEntity journal) { Catalog.getCurrentCatalog().replayBackendTabletsInfo(backendTabletsInfo); break; } + case OperationType.OP_BACKEND_REPLICAS_INFO: { + BackendReplicasInfo backendReplicasInfo = (BackendReplicasInfo) journal.getData(); + Catalog.getCurrentCatalog().replayBackendReplicasInfo(backendReplicasInfo); + break; + } case OperationType.OP_CREATE_ROUTINE_LOAD_JOB: { RoutineLoadJob routineLoadJob = (RoutineLoadJob) journal.getData(); Catalog.getCurrentCatalog().getRoutineLoadManager().replayCreateRoutineLoadJob(routineLoadJob); @@ -1257,6 +1262,10 @@ public void logBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) { logEdit(OperationType.OP_BACKEND_TABLETS_INFO, backendTabletsInfo); } + public void logBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) { + logEdit(OperationType.OP_BACKEND_REPLICAS_INFO, backendReplicasInfo); + } + public void logCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) { logEdit(OperationType.OP_CREATE_ROUTINE_LOAD_JOB, routineLoadJob); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java index 2a5f3a23143223..bb5aaa971edadf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java @@ -88,8 +88,10 @@ public class OperationType { @Deprecated public static final short OP_FINISH_ASYNC_DELETE = 44; public static final short OP_UPDATE_REPLICA = 45; + @Deprecated public static final short OP_BACKEND_TABLETS_INFO = 46; public static final short OP_SET_REPLICA_STATUS = 47; + public static final short OP_BACKEND_REPLICAS_INFO = 48; public static final short OP_ADD_BACKEND = 50; public static final short OP_DROP_BACKEND = 51; From 934cc6109a8cc1489617f885165bc1a9ce101178 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 23 Feb 2022 18:06:00 +0800 Subject: [PATCH 4/6] 3 --- be/src/olap/tablet.cpp | 1 - .../java/org/apache/doris/load/LoadJob.java | 2 - .../apache/doris/master/ReportHandler.java | 29 ++----- .../doris/persist/BackendReplicasInfo.java | 6 ++ .../org/apache/doris/persist/EditLog.java | 1 + .../persist/BackendReplicaInfosTest.java | 81 +++++++++++++++++++ 6 files changed, 96 insertions(+), 24 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 861662c65715ce..a359fbe572b8e3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1248,7 +1248,6 @@ void Tablet::build_tablet_report_info(TTabletInfo* tablet_info) { tablet_info->data_size = _tablet_meta->tablet_footprint(); tablet_info->__set_version_miss(false); - // _max_continuous_version_from_beginning_unlocked(&version); auto max_rowset = rowset_with_max_version(); if (max_rowset == nullptr) { // If the tablet is in running state, it must not be doing schema-change. so if we can not diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java index 59e90ddf1036b3..d3db3b4e8abdcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java @@ -168,9 +168,7 @@ public LoadJob(long id, long dbId, long tableId, long partitionId, String label, this.etlJobInfo = hadoopEtlJobInfo; this.etlJobInfo.setJobStatus(etlStatus); this.idToTableLoadInfo = Maps.newHashMap(); - ; this.idToTabletLoadInfo = Maps.newHashMap(); - ; this.quorumTablets = new HashSet(); this.fullTablets = new HashSet(); this.unfinishedTablets = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 6e374f5757f071..ea4abd9c09f6ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -853,25 +853,12 @@ private static void handleRecoverTablet(ListMultimap tabletRecoveryM break; } - if (replica.getVersion() > tTabletInfo.getVersion()) { - LOG.warn("recover for replica {} of tablet {} on backend {}", - replica.getId(), tabletId, backendId); - if (replica.getVersion() == tTabletInfo.getVersion() + 1) { - // this missing version is the last version of this replica - replica.updateVersionInfoForRecovery( - tTabletInfo.getVersion(), /* set version to BE report version */ - replica.getVersion(), /* set LFV to current FE version */ - tTabletInfo.getVersion() /* set LSV to BE report version */ - ); - } else { - // this missing version is a hole - replica.updateVersionInfoForRecovery( - tTabletInfo.getVersion(), /* set version to BE report version */ - tTabletInfo.getVersion() + 1, /* LFV */ - /* remain LSV unchanged, which should be equal to replica.version */ - replica.getLastSuccessVersion()); - } - // no need to write edit log, if FE crashed, this will be recovered again + if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) { + // The absolute value is meaningless, as long as it is greater than 0. + // This way, in other checking logic, if lastFailedVersion is found to be greater than 0, + // it will be considered a version missing replica and will be handled accordingly. + replica.setLastFailedVersion(1L); + backendReplicasInfo.addMissingVersionReplica(tabletId); break; } } @@ -882,9 +869,9 @@ private static void handleRecoverTablet(ListMultimap tabletRecoveryM } } // end for recovery map - if (!backendTabletsInfo.isEmpty()) { + if (!backendReplicasInfo.isEmpty()) { // need to write edit log the sync the bad info to other FEs - Catalog.getCurrentCatalog().getEditLog().logBackendTabletsInfo(backendTabletsInfo); + Catalog.getCurrentCatalog().getEditLog().logBackendReplicasInfo(backendReplicasInfo); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java index 00d1551a96eff6..c382e1fdb5847f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BackendReplicasInfo.java @@ -34,7 +34,9 @@ // such as bad replica or missing version replica information, etc. public class BackendReplicasInfo implements Writable { + @SerializedName(value = "backendId") private long backendId; + @SerializedName(value = "replicaReportInfos") private List replicaReportInfos = Lists.newArrayList(); public BackendReplicasInfo(long backendId) { @@ -57,6 +59,10 @@ public List getReplicaReportInfos() { return replicaReportInfos; } + public boolean isEmpty() { + return replicaReportInfos.isEmpty(); + } + public static BackendReplicasInfo read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, BackendReplicasInfo.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index eb8bf6b6c27802..574c77e156e58b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1258,6 +1258,7 @@ public void logDropEncryptKey(EncryptKeySearchDesc desc) { logEdit(OperationType.OP_DROP_ENCRYPTKEY, desc); } + @Deprecated public void logBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) { logEdit(OperationType.OP_BACKEND_TABLETS_INFO, backendTabletsInfo); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java new file mode 100644 index 00000000000000..54ceaf44b1030a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/BackendReplicaInfosTest.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.meta.MetaContext; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.util.List; + +public class BackendReplicaInfosTest { + + long beId = 1000; + long tabletId1 = 2001; + long tabletId2 = 2002; + + @Test + public void testSerialization() throws Exception { + MetaContext metaContext = new MetaContext(); + metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT); + metaContext.setThreadLocalInfo(); + + // 1. Write objects to file + File file = new File("./BackendReplicaInfosTest"); + file.createNewFile(); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(file)); + + BackendReplicasInfo info = new BackendReplicasInfo(beId); + info.addBadReplica(tabletId1); + info.addMissingVersionReplica(tabletId2); + checkInfo(info); + info.write(dos); + dos.flush(); + dos.close(); + // 2. Read objects from file + DataInputStream dis = new DataInputStream(new FileInputStream(file)); + + BackendReplicasInfo rInfo1 = BackendReplicasInfo.read(dis); + checkInfo(rInfo1); + + // 3. delete files + dis.close(); + file.delete(); + } + + private void checkInfo(BackendReplicasInfo info) { + Assert.assertTrue(!info.isEmpty()); + List infos = info.getReplicaReportInfos(); + for (BackendReplicasInfo.ReplicaReportInfo reportInfo : infos) { + if (reportInfo.tabletId == tabletId1) { + Assert.assertEquals(BackendReplicasInfo.ReportInfoType.BAD, reportInfo.type); + } else if (reportInfo.tabletId == tabletId2) { + Assert.assertEquals(BackendReplicasInfo.ReportInfoType.MISSING_VERSION, reportInfo.type); + } else { + Assert.fail("unknown tablet id: " + reportInfo.tabletId); + } + } + } +} From 92818a7b505cd781aaeae765a8b2e7b492c3f5d4 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 9 Mar 2022 11:19:21 +0800 Subject: [PATCH 5/6] rebase --- .../org/apache/doris/catalog/Replica.java | 4 +++ .../java/org/apache/doris/load/LoadJob.java | 26 ------------------- 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 0198955c5a1367..a42cebdc8a9ebf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -406,6 +406,10 @@ public boolean checkVersionCatchUp(long expectedVersion, boolean ignoreAlter) { return true; } + public void setLastFailedVersion(long lastFailedVersion) { + this.lastFailedVersion = lastFailedVersion; + } + public void setState(ReplicaState replicaState) { this.state = replicaState; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java index d3db3b4e8abdcc..0fa8a304da23f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java @@ -944,7 +944,6 @@ public void readFields(DataInput in) throws IOException { this.pullLoadSourceInfo = BrokerFileGroupAggInfo.read(in); } -<<<<<<< HEAD this.execMemLimit = in.readLong(); this.transactionId = in.readLong(); if (in.readBoolean()) { @@ -960,31 +959,6 @@ public void readFields(DataInput in) throws IOException { predicate = new IsNullPredicate(new SlotRef(null, key), true); } else { predicate = new IsNullPredicate(new SlotRef(null, key), true); -======= - if (version >= FeMetaVersion.VERSION_45) { - this.transactionId = in.readLong(); - if (in.readBoolean()) { - count = in.readInt(); - conditions = Lists.newArrayList(); - for (int i = 0; i < count; i++) { - String key = Text.readString(in); - String opStr = Text.readString(in); - if (opStr.equalsIgnoreCase("IS")) { - String value = Text.readString(in); - IsNullPredicate predicate; - if (value.equalsIgnoreCase("NOT NULL")) { - predicate = new IsNullPredicate(new SlotRef(null, key), true); - } else { - predicate = new IsNullPredicate(new SlotRef(null, key), true); - } - conditions.add(predicate); - } else { - Operator op = Operator.valueOf(opStr); - String value = Text.readString(in); - BinaryPredicate predicate = new BinaryPredicate(op, new SlotRef(null, key), - new StringLiteral(value)); - conditions.add(predicate); ->>>>>>> 1 } conditions.add(predicate); } else { From 244ba7e6b4c0ef7628feb95e5af00905fccee528 Mon Sep 17 00:00:00 2001 From: morningman Date: Wed, 9 Mar 2022 11:33:43 +0800 Subject: [PATCH 6/6] fix --- be/src/olap/tablet.cpp | 8 ++++---- be/src/olap/tablet.h | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index a359fbe572b8e3..a6f5648b075121 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -553,10 +553,10 @@ bool Tablet::_reconstruct_version_tracker_if_necessary() { OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version, std::vector* version_path, - bool quite) const { + bool quiet) const { OLAPStatus status = _timestamped_version_tracker.capture_consistent_versions(spec_version, version_path); - if (status != OLAP_SUCCESS && !quite) { + if (status != OLAP_SUCCESS && !quiet) { std::vector missed_versions; calc_missed_versions_unlocked(spec_version.second, &missed_versions); if (missed_versions.empty()) { @@ -578,9 +578,9 @@ OLAPStatus Tablet::capture_consistent_versions(const Version& spec_version, return status; } -OLAPStatus Tablet::check_version_integrity(const Version& version, bool quite) { +OLAPStatus Tablet::check_version_integrity(const Version& version, bool quiet) { ReadLock rdlock(&_meta_lock); - return capture_consistent_versions(version, nullptr, quite); + return capture_consistent_versions(version, nullptr, quiet); } // If any rowset contains the specific version, it means the version already exist diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index c282c315a517af..23b384ee57d32d 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -116,12 +116,12 @@ class Tablet : public BaseTablet { void delete_expired_stale_rowset(); // Given spec_version, find a continuous version path and store it in version_path. - // If quite is true, then only "does this path exist" is returned. + // If quiet is true, then only "does this path exist" is returned. OLAPStatus capture_consistent_versions(const Version& spec_version, std::vector* version_path, - bool quite = false) const; - // if quite is true, no error log will be printed if there are missing versions - OLAPStatus check_version_integrity(const Version& version, bool quite = false); + bool quiet = false) const; + // if quiet is true, no error log will be printed if there are missing versions + OLAPStatus check_version_integrity(const Version& version, bool quiet = false); bool check_version_exist(const Version& version) const; void acquire_version_and_rowsets( std::vector>* version_rowsets) const;