Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions be/src/http/action/debug_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "http/http_channel.h"
#include "http/http_status.h"
#include "util/debug_points.h"
#include "util/time.h"

namespace doris {

Expand All @@ -44,36 +43,32 @@ void BaseDebugPointAction::handle(HttpRequest* req) {
}

Status AddDebugPointAction::_handle(HttpRequest* req) {
std::string name = req->param("debug_point");
std::string debug_point = req->param("debug_point");
std::string execute = req->param("execute");
std::string timeout = req->param("timeout");
if (name.empty()) {
if (debug_point.empty()) {
return Status::InternalError("Empty debug point name");
}
auto debug_point = std::make_shared<DebugPoint>();
int64_t execute_limit = -1;
int64_t timeout_second = -1;
try {
if (!execute.empty()) {
debug_point->execute_limit = std::stol(execute);
execute_limit = std::stol(execute);
}
} catch (const std::exception& e) {
return Status::InternalError("Invalid execute limit format, execute {}, err {}", execute,
e.what());
}
try {
if (!timeout.empty()) {
int64_t timeout_second = std::stol(timeout);
if (timeout_second > 0) {
debug_point->expire_ms = MonotonicMillis() + timeout_second * MILLIS_PER_SEC;
}
timeout_second = std::stol(timeout);
}
} catch (const std::exception& e) {
return Status::InternalError("Invalid timeout format, timeout {}, err {}", timeout,
e.what());
}

debug_point->params = *(req->params());

DebugPoints::instance()->add(name, debug_point);
DebugPoints::instance()->add(debug_point, execute_limit, timeout_second);

return Status::OK();
}
Expand Down
39 changes: 17 additions & 22 deletions be/src/util/debug_points.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,42 +30,37 @@ DebugPoints* DebugPoints::instance() {
}

bool DebugPoints::is_enable(const std::string& name) {
return get_debug_point(name) != nullptr;
}

std::shared_ptr<DebugPoint> DebugPoints::get_debug_point(const std::string& name) {
if (!config::enable_debug_points) {
return nullptr;
return false;
}
auto map_ptr = std::atomic_load_explicit(&_debug_points, std::memory_order_relaxed);
auto it = map_ptr->find(name);
if (it == map_ptr->end()) {
return nullptr;
return false;
}

auto debug_point = it->second;
if ((debug_point->expire_ms > 0 && MonotonicMillis() >= debug_point->expire_ms) ||
(debug_point->execute_limit > 0 &&
debug_point->execute_num.fetch_add(1, std::memory_order_relaxed) >=
debug_point->execute_limit)) {
auto& debug_point = *(it->second);
if ((debug_point.expire_ms > 0 && MonotonicMillis() >= debug_point.expire_ms) ||
(debug_point.execute_limit > 0 &&
debug_point.execute_num.fetch_add(1, std::memory_order_relaxed) >=
debug_point.execute_limit)) {
remove(name);
return nullptr;
return false;
}

return debug_point;
return true;
}

void DebugPoints::add(const std::string& name, std::shared_ptr<DebugPoint> debug_point) {
update([&](DebugPointMap& new_points) { new_points[name] = debug_point; });

std::ostringstream oss;
oss << "{";
for (auto [key, value] : debug_point->params) {
oss << key << " : " << value << ", ";
void DebugPoints::add(const std::string& name, int64_t execute_limit, int64_t timeout_second) {
auto debug_point = std::make_shared<DebugPoint>();
debug_point->execute_limit = execute_limit;
if (timeout_second > 0) {
debug_point->expire_ms = MonotonicMillis() + timeout_second * MILLIS_PER_SEC;
}
oss << "}";
update([&](DebugPointMap& new_points) { new_points[name] = debug_point; });

LOG(INFO) << "add debug point: name=" << name << ", params=" << oss.str();
LOG(INFO) << "add debug point: name=" << name << ", execute=" << execute_limit
<< ", timeout=" << timeout_second;
}

void DebugPoints::remove(const std::string& name) {
Expand Down
72 changes: 7 additions & 65 deletions be/src/util/debug_points.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@
#pragma once

#include <atomic>
#include <boost/lexical_cast.hpp>
#include <functional>
#include <map>
#include <memory>
#include <type_traits>
#include <string>

#include "common/compiler_util.h"
#include "common/config.h"
#include "fmt/format.h"

// more usage can see 'util/debug_points_test.cpp'
#define DBUG_EXECUTE_IF(debug_point_name, code) \
if (UNLIKELY(config::enable_debug_points)) { \
auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
if (dp) { \
code; \
} \
#define DBUG_EXECUTE_IF(debug_point, code) \
if (UNLIKELY(config::enable_debug_points)) { \
if (DebugPoints::instance()->is_enable(debug_point)) { \
code; \
} \
}

namespace doris {
Expand All @@ -43,69 +39,15 @@ struct DebugPoint {
std::atomic<int64_t> execute_num {0};
int64_t execute_limit = -1;
int64_t expire_ms = -1;

std::map<std::string, std::string> params;

template <typename T>
T param(const std::string& key, T default_value = T()) {
auto it = params.find(key);
if (it == params.end()) {
return default_value;
}
if constexpr (std::is_same_v<T, bool>) {
if (it->second == "true") {
return true;
}
if (it->second == "false") {
return false;
}
return boost::lexical_cast<T>(it->second);
} else if constexpr (std::is_arithmetic_v<T>) {
return boost::lexical_cast<T>(it->second);
} else if constexpr (std::is_same_v<T, const char*>) {
return it->second.c_str();
} else {
static_assert(std::is_same_v<T, std::string>);
return it->second;
}
}
};

class DebugPoints {
public:
bool is_enable(const std::string& name);
std::shared_ptr<DebugPoint> get_debug_point(const std::string& name);
void add(const std::string& name, int64_t execute_limit, int64_t timeout_second);
void remove(const std::string& name);
void clear();

// if not enable debug point or its params not contains `key`, then return `default_value`
// url: /api/debug_point/add/name?k1=v1&k2=v2&...
template <typename T>
T get_debug_param_or_default(const std::string& name, const std::string& key,
const T& default_value) {
auto debug_point = get_debug_point(name);
return debug_point ? debug_point->param(key, default_value) : default_value;
}

// url: /api/debug_point/add/name?value=v
template <typename T>
T get_debug_param_or_default(const std::string& name, const T& default_value) {
return get_debug_param_or_default(name, "value", default_value);
}

void add(const std::string& name, std::shared_ptr<DebugPoint> debug_point);

// more 'add' functions for convenient use
void add(const std::string& name) { add(name, std::make_shared<DebugPoint>()); }
void add_with_params(const std::string& name,
const std::map<std::string, std::string>& params) {
add(name, std::shared_ptr<DebugPoint>(new DebugPoint {.params = params}));
}
template <typename T>
void add_with_value(const std::string& name, const T& value) {
add_with_params(name, {{"value", fmt::format("{}", value)}});
}

static DebugPoints* instance();

private:
Expand Down
40 changes: 0 additions & 40 deletions be/test/util/debug_points_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,46 +54,6 @@ TEST(DebugPointsTest, BaseTest) {
EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug4"));
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
EXPECT_FALSE(DebugPoints::instance()->is_enable("dbug4"));

POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false");
EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug5"));
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param<int>("v1", 100)));
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param<std::string>("v2")));
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string())));
DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("a", dp->param("v2", "b")));
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param<double>("v3")));
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0)));
DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false)));
DBUG_EXECUTE_IF("dbug5", EXPECT_FALSE(dp->param("v5", false)));
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param<int64_t>("v_not_exist")));
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L)));
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123)));
DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("abcd", dp->param("v_not_exist", "abcd")));

EXPECT_EQ(1.2, DebugPoints::instance()->get_debug_param_or_default("dbug5", "v3", 0.0));
EXPECT_EQ(100,
DebugPoints::instance()->get_debug_param_or_default("point_not_exists", "k", 100));

POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug6?value=567");
EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug6", 0));
}

TEST(DebugPointsTest, AddTest) {
config::enable_debug_points = true;
DebugPoints::instance()->clear();

DebugPoints::instance()->add("dbug1");
EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug1"));

DebugPoints::instance()->add_with_params("dbug2", {{"k1", "100"}});
EXPECT_EQ(100, DebugPoints::instance()->get_debug_param_or_default("dbug2", "k1", 0));

DebugPoints::instance()->add_with_value("dbug3", 567);
EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug3", 567));

DebugPoints::instance()->add_with_value("dbug4", "hello");
EXPECT_EQ("hello",
DebugPoints::instance()->get_debug_param_or_default<std::string>("dbug4", ""));
}

} // namespace doris
36 changes: 6 additions & 30 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.thrift.TUniqueId;

import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -115,14 +114,6 @@ public enum ReplicaStatus {
private TUniqueId cooldownMetaId;
private long cooldownTerm = -1;

// A replica version should increase monotonically,
// but backend may missing some versions due to disk failure or bugs.
// FE should found these and mark the replica as missing versions.
// If backend's report version < fe version, record the backend's report version as `regressiveVersion`,
// and if time exceed 5min, fe should mark this replica as missing versions.
private long regressiveVersion = -1;
private long regressiveVersionTimestamp = 0;

/*
* This can happen when this replica is created by a balance clone task, and
* when task finished, the version of this replica is behind the partition's visible version.
Expand Down Expand Up @@ -444,9 +435,9 @@ private void updateReplicaInfo(long newVersion,

if (lastFailedVersion != this.lastFailedVersion) {
// Case 2:
if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) {
if (lastFailedVersion > this.lastFailedVersion) {
this.lastFailedVersion = lastFailedVersion;
this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L;
this.lastFailedTimestamp = System.currentTimeMillis();
}

this.lastSuccessVersion = this.version;
Expand Down Expand Up @@ -515,6 +506,10 @@ public boolean checkVersionCatchUp(long expectedVersion, boolean ignoreAlter) {
return true;
}

public void setLastFailedVersion(long lastFailedVersion) {
this.lastFailedVersion = lastFailedVersion;
}

public void setState(ReplicaState replicaState) {
this.state = replicaState;
}
Expand All @@ -539,25 +534,6 @@ public void setVersionCount(long versionCount) {
this.versionCount = versionCount;
}

public boolean checkVersionRegressive(long newVersion) {
if (newVersion >= version) {
regressiveVersion = -1;
regressiveVersionTimestamp = -1;
return false;
}

if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) {
return true;
}

if (newVersion != regressiveVersion) {
regressiveVersion = newVersion;
regressiveVersionTimestamp = System.currentTimeMillis();
}

return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L;
}

@Override
public String toString() {
StringBuilder strBuffer = new StringBuilder("[replicaId=");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,22 +392,10 @@ private boolean needSync(Replica replicaInFe, TTabletInfo backendTabletInfo) {
if (backendTabletInfo.getVersion() > versionInFe) {
// backend replica's version is larger or newer than replica in FE, sync it.
return true;
} else if (versionInFe == backendTabletInfo.getVersion()) {
} else if (versionInFe == backendTabletInfo.getVersion() && replicaInFe.isBad()) {
// backend replica's version is equal to replica in FE, but replica in FE is bad,
// while backend replica is good, sync it
if (replicaInFe.isBad()) {
return true;
}

// FE' s replica last failed version > partition's committed version
// this can be occur when be report miss version, fe will set last failed version = visible version + 1
// then last failed version may greater than partition's committed version
//
// But here cannot got variable partition, we just check lastFailedVersion = version + 1,
// In ReportHandler.sync, we will check if last failed version > partition's committed version again.
if (replicaInFe.getLastFailedVersion() == versionInFe + 1) {
return true;
}
return true;
}

return false;
Expand Down Expand Up @@ -515,12 +503,6 @@ private boolean needRecover(Replica replicaInFe, int schemaHashInFe, TTabletInfo
// so we only return true if version_miss is true.
return true;
}

// backend versions regressive due to bugs
if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) {
return true;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,13 +1074,6 @@ public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)

replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(),
reportedTablet.getDataSize(), reportedTablet.getRowCount());
if (replica.getLastFailedVersion() > partition.getCommittedVersion()
&& reportedTablet.getVersion() >= partition.getCommittedVersion()
//&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss()
&& !(reportedTablet.isSetUsed() && !reportedTablet.isUsed())) {
LOG.info("change replica {} of tablet {} 's last failed version to -1", replica, tabletId);
replica.updateLastFailedVersion(-1L);
}
if (reportedTablet.isSetPathHash()) {
replica.setPathHash(reportedTablet.getPathHash());
}
Expand Down
Loading