diff --git a/be/src/http/action/debug_point_action.cpp b/be/src/http/action/debug_point_action.cpp index 04aa38efaa4b74..08b1e116b2bcd4 100644 --- a/be/src/http/action/debug_point_action.cpp +++ b/be/src/http/action/debug_point_action.cpp @@ -21,7 +21,6 @@ #include "http/http_channel.h" #include "http/http_status.h" #include "util/debug_points.h" -#include "util/time.h" namespace doris { @@ -44,16 +43,17 @@ void BaseDebugPointAction::handle(HttpRequest* req) { } Status AddDebugPointAction::_handle(HttpRequest* req) { - std::string name = req->param("debug_point"); + std::string debug_point = req->param("debug_point"); std::string execute = req->param("execute"); std::string timeout = req->param("timeout"); - if (name.empty()) { + if (debug_point.empty()) { return Status::InternalError("Empty debug point name"); } - auto debug_point = std::make_shared(); + int64_t execute_limit = -1; + int64_t timeout_second = -1; try { if (!execute.empty()) { - debug_point->execute_limit = std::stol(execute); + execute_limit = std::stol(execute); } } catch (const std::exception& e) { return Status::InternalError("Invalid execute limit format, execute {}, err {}", execute, @@ -61,19 +61,14 @@ Status AddDebugPointAction::_handle(HttpRequest* req) { } try { if (!timeout.empty()) { - int64_t timeout_second = std::stol(timeout); - if (timeout_second > 0) { - debug_point->expire_ms = MonotonicMillis() + timeout_second * MILLIS_PER_SEC; - } + timeout_second = std::stol(timeout); } } catch (const std::exception& e) { return Status::InternalError("Invalid timeout format, timeout {}, err {}", timeout, e.what()); } - debug_point->params = *(req->params()); - - DebugPoints::instance()->add(name, debug_point); + DebugPoints::instance()->add(debug_point, execute_limit, timeout_second); return Status::OK(); } diff --git a/be/src/util/debug_points.cpp b/be/src/util/debug_points.cpp index 43bb39df9a4622..587f8c944a343c 100644 --- a/be/src/util/debug_points.cpp +++ b/be/src/util/debug_points.cpp @@ -30,42 +30,37 @@ DebugPoints* DebugPoints::instance() { } bool DebugPoints::is_enable(const std::string& name) { - return get_debug_point(name) != nullptr; -} - -std::shared_ptr DebugPoints::get_debug_point(const std::string& name) { if (!config::enable_debug_points) { - return nullptr; + return false; } auto map_ptr = std::atomic_load_explicit(&_debug_points, std::memory_order_relaxed); auto it = map_ptr->find(name); if (it == map_ptr->end()) { - return nullptr; + return false; } - auto debug_point = it->second; - if ((debug_point->expire_ms > 0 && MonotonicMillis() >= debug_point->expire_ms) || - (debug_point->execute_limit > 0 && - debug_point->execute_num.fetch_add(1, std::memory_order_relaxed) >= - debug_point->execute_limit)) { + auto& debug_point = *(it->second); + if ((debug_point.expire_ms > 0 && MonotonicMillis() >= debug_point.expire_ms) || + (debug_point.execute_limit > 0 && + debug_point.execute_num.fetch_add(1, std::memory_order_relaxed) >= + debug_point.execute_limit)) { remove(name); - return nullptr; + return false; } - return debug_point; + return true; } -void DebugPoints::add(const std::string& name, std::shared_ptr debug_point) { - update([&](DebugPointMap& new_points) { new_points[name] = debug_point; }); - - std::ostringstream oss; - oss << "{"; - for (auto [key, value] : debug_point->params) { - oss << key << " : " << value << ", "; +void DebugPoints::add(const std::string& name, int64_t execute_limit, int64_t timeout_second) { + auto debug_point = std::make_shared(); + debug_point->execute_limit = execute_limit; + if (timeout_second > 0) { + debug_point->expire_ms = MonotonicMillis() + timeout_second * MILLIS_PER_SEC; } - oss << "}"; + update([&](DebugPointMap& new_points) { new_points[name] = debug_point; }); - LOG(INFO) << "add debug point: name=" << name << ", params=" << oss.str(); + LOG(INFO) << "add debug point: name=" << name << ", execute=" << execute_limit + << ", timeout=" << timeout_second; } void DebugPoints::remove(const std::string& name) { diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h index 1106a548f8d928..704405689cc621 100644 --- a/be/src/util/debug_points.h +++ b/be/src/util/debug_points.h @@ -18,23 +18,19 @@ #pragma once #include -#include #include #include #include -#include +#include #include "common/compiler_util.h" #include "common/config.h" -#include "fmt/format.h" -// more usage can see 'util/debug_points_test.cpp' -#define DBUG_EXECUTE_IF(debug_point_name, code) \ - if (UNLIKELY(config::enable_debug_points)) { \ - auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ - if (dp) { \ - code; \ - } \ +#define DBUG_EXECUTE_IF(debug_point, code) \ + if (UNLIKELY(config::enable_debug_points)) { \ + if (DebugPoints::instance()->is_enable(debug_point)) { \ + code; \ + } \ } namespace doris { @@ -43,69 +39,15 @@ struct DebugPoint { std::atomic execute_num {0}; int64_t execute_limit = -1; int64_t expire_ms = -1; - - std::map params; - - template - T param(const std::string& key, T default_value = T()) { - auto it = params.find(key); - if (it == params.end()) { - return default_value; - } - if constexpr (std::is_same_v) { - if (it->second == "true") { - return true; - } - if (it->second == "false") { - return false; - } - return boost::lexical_cast(it->second); - } else if constexpr (std::is_arithmetic_v) { - return boost::lexical_cast(it->second); - } else if constexpr (std::is_same_v) { - return it->second.c_str(); - } else { - static_assert(std::is_same_v); - return it->second; - } - } }; class DebugPoints { public: bool is_enable(const std::string& name); - std::shared_ptr get_debug_point(const std::string& name); + void add(const std::string& name, int64_t execute_limit, int64_t timeout_second); void remove(const std::string& name); void clear(); - // if not enable debug point or its params not contains `key`, then return `default_value` - // url: /api/debug_point/add/name?k1=v1&k2=v2&... - template - T get_debug_param_or_default(const std::string& name, const std::string& key, - const T& default_value) { - auto debug_point = get_debug_point(name); - return debug_point ? debug_point->param(key, default_value) : default_value; - } - - // url: /api/debug_point/add/name?value=v - template - T get_debug_param_or_default(const std::string& name, const T& default_value) { - return get_debug_param_or_default(name, "value", default_value); - } - - void add(const std::string& name, std::shared_ptr debug_point); - - // more 'add' functions for convenient use - void add(const std::string& name) { add(name, std::make_shared()); } - void add_with_params(const std::string& name, - const std::map& params) { - add(name, std::shared_ptr(new DebugPoint {.params = params})); - } - template - void add_with_value(const std::string& name, const T& value) { - add_with_params(name, {{"value", fmt::format("{}", value)}}); - } - static DebugPoints* instance(); private: diff --git a/be/test/util/debug_points_test.cpp b/be/test/util/debug_points_test.cpp index 76c4fd00781aa9..c2cf2bdedfd7b2 100644 --- a/be/test/util/debug_points_test.cpp +++ b/be/test/util/debug_points_test.cpp @@ -54,46 +54,6 @@ TEST(DebugPointsTest, BaseTest) { EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug4")); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); EXPECT_FALSE(DebugPoints::instance()->is_enable("dbug4")); - - POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false"); - EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug5")); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param("v1", 100))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2"))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string()))); - DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("a", dp->param("v2", "b"))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3"))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0))); - DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false))); - DBUG_EXECUTE_IF("dbug5", EXPECT_FALSE(dp->param("v5", false))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist"))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123))); - DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("abcd", dp->param("v_not_exist", "abcd"))); - - EXPECT_EQ(1.2, DebugPoints::instance()->get_debug_param_or_default("dbug5", "v3", 0.0)); - EXPECT_EQ(100, - DebugPoints::instance()->get_debug_param_or_default("point_not_exists", "k", 100)); - - POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug6?value=567"); - EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug6", 0)); -} - -TEST(DebugPointsTest, AddTest) { - config::enable_debug_points = true; - DebugPoints::instance()->clear(); - - DebugPoints::instance()->add("dbug1"); - EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug1")); - - DebugPoints::instance()->add_with_params("dbug2", {{"k1", "100"}}); - EXPECT_EQ(100, DebugPoints::instance()->get_debug_param_or_default("dbug2", "k1", 0)); - - DebugPoints::instance()->add_with_value("dbug3", 567); - EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug3", 567)); - - DebugPoints::instance()->add_with_value("dbug4", "hello"); - EXPECT_EQ("hello", - DebugPoints::instance()->get_debug_param_or_default("dbug4", "")); } } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index 631f2ebaf3ba05..e55eab8939216f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -20,7 +20,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.thrift.TUniqueId; import com.google.gson.annotations.SerializedName; @@ -115,14 +114,6 @@ public enum ReplicaStatus { private TUniqueId cooldownMetaId; private long cooldownTerm = -1; - // A replica version should increase monotonically, - // but backend may missing some versions due to disk failure or bugs. - // FE should found these and mark the replica as missing versions. - // If backend's report version < fe version, record the backend's report version as `regressiveVersion`, - // and if time exceed 5min, fe should mark this replica as missing versions. - private long regressiveVersion = -1; - private long regressiveVersionTimestamp = 0; - /* * This can happen when this replica is created by a balance clone task, and * when task finished, the version of this replica is behind the partition's visible version. @@ -444,9 +435,9 @@ private void updateReplicaInfo(long newVersion, if (lastFailedVersion != this.lastFailedVersion) { // Case 2: - if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) { + if (lastFailedVersion > this.lastFailedVersion) { this.lastFailedVersion = lastFailedVersion; - this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L; + this.lastFailedTimestamp = System.currentTimeMillis(); } this.lastSuccessVersion = this.version; @@ -515,6 +506,10 @@ public boolean checkVersionCatchUp(long expectedVersion, boolean ignoreAlter) { return true; } + public void setLastFailedVersion(long lastFailedVersion) { + this.lastFailedVersion = lastFailedVersion; + } + public void setState(ReplicaState replicaState) { this.state = replicaState; } @@ -539,25 +534,6 @@ public void setVersionCount(long versionCount) { this.versionCount = versionCount; } - public boolean checkVersionRegressive(long newVersion) { - if (newVersion >= version) { - regressiveVersion = -1; - regressiveVersionTimestamp = -1; - return false; - } - - if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) { - return true; - } - - if (newVersion != regressiveVersion) { - regressiveVersion = newVersion; - regressiveVersionTimestamp = System.currentTimeMillis(); - } - - return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L; - } - @Override public String toString() { StringBuilder strBuffer = new StringBuilder("[replicaId="); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index c1b7ca293bcda2..39818d52bc62f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -392,22 +392,10 @@ private boolean needSync(Replica replicaInFe, TTabletInfo backendTabletInfo) { if (backendTabletInfo.getVersion() > versionInFe) { // backend replica's version is larger or newer than replica in FE, sync it. return true; - } else if (versionInFe == backendTabletInfo.getVersion()) { + } else if (versionInFe == backendTabletInfo.getVersion() && replicaInFe.isBad()) { // backend replica's version is equal to replica in FE, but replica in FE is bad, // while backend replica is good, sync it - if (replicaInFe.isBad()) { - return true; - } - - // FE' s replica last failed version > partition's committed version - // this can be occur when be report miss version, fe will set last failed version = visible version + 1 - // then last failed version may greater than partition's committed version - // - // But here cannot got variable partition, we just check lastFailedVersion = version + 1, - // In ReportHandler.sync, we will check if last failed version > partition's committed version again. - if (replicaInFe.getLastFailedVersion() == versionInFe + 1) { - return true; - } + return true; } return false; @@ -515,12 +503,6 @@ private boolean needRecover(Replica replicaInFe, int schemaHashInFe, TTabletInfo // so we only return true if version_miss is true. return true; } - - // backend versions regressive due to bugs - if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) { - return true; - } - return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index 3f52210e8f1324..b4667f80696d75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -1074,13 +1074,6 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request) replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(), reportedTablet.getDataSize(), reportedTablet.getRowCount()); - if (replica.getLastFailedVersion() > partition.getCommittedVersion() - && reportedTablet.getVersion() >= partition.getCommittedVersion() - //&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss() - && !(reportedTablet.isSetUsed() && !reportedTablet.isUsed())) { - LOG.info("change replica {} of tablet {} 's last failed version to -1", replica, tabletId); - replica.updateLastFailedVersion(-1L); - } if (reportedTablet.isSetPathHash()) { replica.setPathHash(reportedTablet.getPathHash()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java index da06232f0c0f5e..aab9b8f2ba6a53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java @@ -19,8 +19,6 @@ import org.apache.doris.common.Config; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -30,114 +28,45 @@ /** * Use for manage debug points. - * - * usage example can see DebugPointUtilTest.java - * **/ public class DebugPointUtil { private static final Logger LOG = LogManager.getLogger(DebugPointUtil.class); private static final Map debugPoints = new ConcurrentHashMap<>(); - public static class DebugPoint { + private static class DebugPoint { public AtomicInteger executeNum = new AtomicInteger(0); public int executeLimit = -1; public long expireTime = -1; - - // params - public Map params = Maps.newHashMap(); - - public E param(String key, E defaultValue) { - Preconditions.checkState(defaultValue != null); - - String value = params.get(key); - if (value == null) { - return defaultValue; - } - if (defaultValue instanceof Boolean) { - return (E) Boolean.valueOf(value); - } - if (defaultValue instanceof Byte) { - return (E) Byte.valueOf(value); - } - if (defaultValue instanceof Character) { - Preconditions.checkState(value.length() == 1); - return (E) Character.valueOf(value.charAt(0)); - } - if (defaultValue instanceof Short) { - return (E) Short.valueOf(value); - } - if (defaultValue instanceof Integer) { - return (E) Integer.valueOf(value); - } - if (defaultValue instanceof Long) { - return (E) Long.valueOf(value); - } - if (defaultValue instanceof Float) { - return (E) Float.valueOf(value); - } - if (defaultValue instanceof Double) { - return (E) Double.valueOf(value); - } - if (defaultValue instanceof String) { - return (E) value; - } - - Preconditions.checkState(false, "Can not convert with default value=" + defaultValue); - - return defaultValue; - } } public static boolean isEnable(String debugPointName) { - return getDebugPoint(debugPointName) != null; - } - - public static DebugPoint getDebugPoint(String debugPointName) { if (!Config.enable_debug_points) { - return null; + return false; } DebugPoint debugPoint = debugPoints.get(debugPointName); if (debugPoint == null) { - return null; + return false; } if ((debugPoint.expireTime > 0 && System.currentTimeMillis() >= debugPoint.expireTime) || (debugPoint.executeLimit > 0 && debugPoint.executeNum.incrementAndGet() > debugPoint.executeLimit)) { debugPoints.remove(debugPointName); - return null; + return false; } - return debugPoint; - } - - // if not enable debug point or its params not contains `key`, then return `defaultValue` - // url: /api/debug_point/add/name?k1=v1&k2=v2&... - public static E getDebugParamOrDefault(String debugPointName, String key, E defaultValue) { - DebugPoint debugPoint = getDebugPoint(debugPointName); - - return debugPoint != null ? debugPoint.param(key, defaultValue) : defaultValue; - } - - // url: /api/debug_point/add/name?value=v - public static E getDebugParamOrDefault(String debugPointName, E defaultValue) { - return getDebugParamOrDefault(debugPointName, "value", defaultValue); - } - - public static void addDebugPoint(String name, DebugPoint debugPoint) { - debugPoints.put(name, debugPoint); - LOG.info("add debug point: name={}, params={}", name, debugPoint.params); - } - - public static void addDebugPoint(String name) { - addDebugPoint(name, new DebugPoint()); + return true; } - public static void addDebugPointWithValue(String name, E value) { + public static void addDebugPoint(String name, int executeLimit, long timeoutSecond) { DebugPoint debugPoint = new DebugPoint(); - debugPoint.params.put("value", String.format("%s", value)); - addDebugPoint(name, debugPoint); + debugPoint.executeLimit = executeLimit; + if (timeoutSecond > 0) { + debugPoint.expireTime = System.currentTimeMillis() + timeoutSecond * 1000; + } + debugPoints.put(name, debugPoint); + LOG.info("add debug point: name={}, execute={}, timeout seconds={}", name, executeLimit, timeoutSecond); } public static void removeDebugPoint(String name) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java index 8c102fd0ae45a8..25ee7a5d0a6a3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java @@ -19,7 +19,6 @@ import org.apache.doris.common.Config; import org.apache.doris.common.util.DebugPointUtil; -import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -41,7 +40,7 @@ public class DebugPointAction extends RestBaseController { @RequestMapping(path = "/api/debug_point/add/{debugPoint}", method = RequestMethod.POST) - protected Object addDebugPoint(@PathVariable("debugPoint") String name, + protected Object addDebugPoint(@PathVariable("debugPoint") String debugPoint, @RequestParam(name = "execute", required = false, defaultValue = "") String execute, @RequestParam(name = "timeout", required = false, defaultValue = "") String timeout, HttpServletRequest request, HttpServletResponse response) { @@ -51,38 +50,28 @@ protected Object addDebugPoint(@PathVariable("debugPoint") String name, } executeCheckPassword(request, response); checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); - if (Strings.isNullOrEmpty(name)) { + if (Strings.isNullOrEmpty(debugPoint)) { return ResponseEntityBuilder.badRequest("Empty debug point name."); } - - DebugPoint debugPoint = new DebugPoint(); + int executeLimit = -1; if (!Strings.isNullOrEmpty(execute)) { try { - debugPoint.executeLimit = Integer.valueOf(execute); + executeLimit = Integer.valueOf(execute); } catch (Exception e) { return ResponseEntityBuilder.badRequest( "Invalid execute format: " + execute + ", err " + e.getMessage()); } } + long timeoutSeconds = -1; if (!Strings.isNullOrEmpty(timeout)) { try { - long timeoutSeconds = Long.valueOf(timeout); - if (timeoutSeconds > 0) { - debugPoint.expireTime = System.currentTimeMillis() + timeoutSeconds * 1000; - } + timeoutSeconds = Long.valueOf(timeout); } catch (Exception e) { return ResponseEntityBuilder.badRequest( "Invalid timeout format: " + timeout + ", err " + e.getMessage()); } } - request.getParameterMap().forEach((key, values) -> { - if (values != null && values.length > 0) { - debugPoint.params.put(key, values[0]); - } - }); - - DebugPointUtil.addDebugPoint(name, debugPoint); - + DebugPointUtil.addDebugPoint(debugPoint, executeLimit, timeoutSeconds); return ResponseEntityBuilder.ok(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 64b771663b2465..2833eff5f3d0e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -192,7 +192,7 @@ public TMasterResult finishTask(TFinishTaskRequest request) { finishRecoverTablet(task); break; case ALTER: - finishAlterTask(task, request); + finishAlterTask(task); break; case ALTER_INVERTED_INDEX: finishAlterInvertedIndexTask(task, request); @@ -575,7 +575,7 @@ public TMasterResult report(TReportRequest request) throws TException { return reportHandler.handleReport(request); } - private void finishAlterTask(AgentTask task, TFinishTaskRequest request) { + private void finishAlterTask(AgentTask task) { AlterReplicaTask alterTask = (AlterReplicaTask) task; try { if (alterTask.getJobType() == JobType.ROLLUP) { @@ -584,11 +584,6 @@ private void finishAlterTask(AgentTask task, TFinishTaskRequest request) { Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask); } alterTask.setFinished(true); - if (request.isSetReportVersion()) { - long reportVersion = request.getReportVersion(); - Env.getCurrentSystemInfo().updateBackendReportVersion( - task.getBackendId(), reportVersion, task.getDbId(), task.getTableId()); - } } catch (MetaNotFoundException e) { LOG.warn("failed to handle finish alter task: {}, {}", task.getSignature(), e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 5e48d31bb44b9f..73a530f294ed7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -404,8 +404,7 @@ private static void diffResource(List storageResourcesInBe, Li } } - // public for fe ut - public static void tabletReport(long backendId, Map backendTablets, long backendReportVersion) { + private static void tabletReport(long backendId, Map backendTablets, long backendReportVersion) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); @@ -609,11 +608,6 @@ private static void sync(Map backendTablets, ListMultimap backendTablets, ListMultimap= partition.getCommittedVersion() - && replica.getLastFailedVersion() > partition.getCommittedVersion()) { - LOG.info("sync replica {} of tablet {} in backend {} in db {}. replica last failed" - + " version change to -1 because last failed version > replica's committed" - + " version {}", - replica, tabletId, backendId, dbId, partition.getCommittedVersion()); - replica.updateLastFailedVersion(-1L); - needSync = true; + if (metaVersion < backendVersion + || (metaVersion == backendVersion && replica.isBad())) { + + if (backendReportVersion < Env.getCurrentSystemInfo() + .getBackendReportVersion(backendId)) { + continue; } - } - if (needSync) { // happens when // 1. PUSH finished in BE but failed or not yet report to FE // 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE @@ -1066,25 +1049,18 @@ private static void handleRecoverTablet(ListMultimap tabletRecoveryM break; } - if ((tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) - || replica.checkVersionRegressive(tTabletInfo.getVersion())) { + if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) { // If the origin last failed version is larger than 0, not change it. // Otherwise, we set last failed version to replica'version + 1. // Because last failed version should always larger than replica's version. long newLastFailedVersion = replica.getLastFailedVersion(); if (newLastFailedVersion < 0) { newLastFailedVersion = replica.getVersion() + 1; - replica.updateLastFailedVersion(newLastFailedVersion); - LOG.warn("set missing version for replica {} of tablet {} on backend {}, " - + "version in fe {}, version in be {}, be missing {}", - replica.getId(), tabletId, backendId, replica.getVersion(), - tTabletInfo.getVersion(), tTabletInfo.isVersionMiss()); } + replica.updateLastFailedVersion(newLastFailedVersion); backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion); break; } - - break; } } } finally { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java deleted file mode 100644 index f56a55d58747d2..00000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java +++ /dev/null @@ -1,177 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.clone; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MaterializedIndex; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Replica; -import org.apache.doris.catalog.Tablet; -import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.util.DebugPointUtil; -import org.apache.doris.common.util.DebugPointUtil.DebugPoint; -import org.apache.doris.master.ReportHandler; -import org.apache.doris.thrift.TTablet; -import org.apache.doris.thrift.TTabletInfo; -import org.apache.doris.utframe.TestWithFeService; - -import com.google.common.collect.Maps; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.Map; - -public class RepairVersionTest extends TestWithFeService { - private class TableInfo { - Partition partition; - Tablet tablet; - Replica replica; - } - - @Override - protected void beforeCreatingConnectContext() throws Exception { - Config.enable_debug_points = true; - Config.disable_balance = true; - Config.disable_tablet_scheduler = true; - Config.allow_replica_on_same_host = true; - FeConstants.tablet_checker_interval_ms = 100; - FeConstants.tablet_schedule_interval_ms = 100; - } - - @Override - protected void runBeforeAll() throws Exception { - createDatabase("test"); - } - - @Override - protected int backendNum() { - return 2; - } - - @Test - public void testRepairLastFailedVersionByClone() throws Exception { - TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_clone"); - Partition partition = info.partition; - Replica replica = info.replica; - - replica.updateLastFailedVersion(replica.getVersion() + 1); - Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion()); - - Config.disable_tablet_scheduler = false; - Thread.sleep(1000); - Config.disable_tablet_scheduler = true; - - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - } - - @Test - public void testRepairLastFailedVersionByReport() throws Exception { - TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_report"); - Partition partition = info.partition; - Tablet tablet = info.tablet; - Replica replica = info.replica; - - replica.updateLastFailedVersion(replica.getVersion() + 1); - Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion()); - - TTabletInfo tTabletInfo = new TTabletInfo(); - tTabletInfo.setTabletId(tablet.getId()); - tTabletInfo.setSchemaHash(replica.getSchemaHash()); - tTabletInfo.setVersion(replica.getVersion()); - tTabletInfo.setPathHash(replica.getPathHash()); - tTabletInfo.setPartitionId(partition.getId()); - tTabletInfo.setReplicaId(replica.getId()); - - TTablet tTablet = new TTablet(); - tTablet.addToTabletInfos(tTabletInfo); - Map tablets = Maps.newHashMap(); - tablets.put(tablet.getId(), tTablet); - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); - - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - } - - @Test - public void testVersionRegressive() throws Exception { - TableInfo info = prepareTableForTest("tbl_version_regressive"); - Partition partition = info.partition; - Tablet tablet = info.tablet; - Replica replica = info.replica; - - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - Assertions.assertTrue(replica.getVersion() > 1L); - - TTabletInfo tTabletInfo = new TTabletInfo(); - tTabletInfo.setTabletId(tablet.getId()); - tTabletInfo.setSchemaHash(replica.getSchemaHash()); - tTabletInfo.setVersion(1L); // be report version = 1 which less than fe version - tTabletInfo.setPathHash(replica.getPathHash()); - tTabletInfo.setPartitionId(partition.getId()); - tTabletInfo.setReplicaId(replica.getId()); - - TTablet tTablet = new TTablet(); - tTablet.addToTabletInfos(tTabletInfo); - Map tablets = Maps.newHashMap(); - tablets.put(tablet.getId(), tTablet); - - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - - DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); - ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); - Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); - - Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); - } - - private TableInfo prepareTableForTest(String tableName) throws Exception { - createTable("CREATE TABLE test." + tableName + " (k INT) DISTRIBUTED BY HASH(k) " - + " BUCKETS 1 PROPERTIES ( \"replication_num\" = \"2\" )"); - - Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test"); - OlapTable tbl = (OlapTable) db.getTableOrMetaException(tableName); - Assertions.assertNotNull(tbl); - Partition partition = tbl.getPartitions().iterator().next(); - Tablet tablet = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() - .getTablets().iterator().next(); - - long visibleVersion = 2L; - partition.updateVisibleVersion(visibleVersion); - partition.setNextVersion(visibleVersion + 1); - tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L)); - - Replica replica = tablet.getReplicas().iterator().next(); - Assertions.assertEquals(visibleVersion, replica.getVersion()); - Assertions.assertEquals(-1L, replica.getLastFailedVersion()); - - TableInfo info = new TableInfo(); - info.partition = partition; - info.tablet = tablet; - info.replica = replica; - - return info; - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java index 0a68885bf26e56..2845cec9225d39 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java @@ -18,7 +18,6 @@ package org.apache.doris.common.util; import org.apache.doris.common.Config; -import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.http.DorisHttpTestCase; import okhttp3.Request; @@ -55,23 +54,6 @@ public void testDebugPoint() throws Exception { Assert.assertTrue(DebugPointUtil.isEnable("dbug4")); Thread.sleep(1000); Assert.assertFalse(DebugPointUtil.isEnable("dbug4")); - - sendRequest("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false"); - Assert.assertTrue(DebugPointUtil.isEnable("dbug5")); - DebugPoint debugPoint = DebugPointUtil.getDebugPoint("dbug5"); - Assert.assertNotNull(debugPoint); - Assert.assertEquals(1, (int) debugPoint.param("v1", 0)); - Assert.assertEquals("a", debugPoint.param("v2", "")); - Assert.assertEquals(1.2, debugPoint.param("v3", 0.0), 1e-6); - Assert.assertTrue(debugPoint.param("v4", false)); - Assert.assertFalse(debugPoint.param("v5", false)); - Assert.assertEquals(123L, (long) debugPoint.param("v_no_exist", 123L)); - - Assert.assertEquals(1, (int) DebugPointUtil.getDebugParamOrDefault("dbug5", "v1", 0)); - Assert.assertEquals(100, (int) DebugPointUtil.getDebugParamOrDefault("point_not_exists", "v1", 100)); - - sendRequest("/api/debug_point/add/dbug6?value=100"); - Assert.assertEquals(100, (int) DebugPointUtil.getDebugParamOrDefault("dbug6", 0)); } private void sendRequest(String uri) throws Exception { diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index ac4fa2660db7ea..a17fcdf72bc699 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -392,7 +392,7 @@ protected void createDorisCluster(String runningDir, int backendNum) InterruptedException { int feRpcPort = startFEServer(runningDir); List bes = Lists.newArrayList(); - System.out.println("start create backend, backend num " + backendNum); + System.out.println("start create backend"); for (int i = 0; i < backendNum; i++) { bes.add(createBackend("127.0.0.1", feRpcPort)); }