Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void HeartbeatServer::heartbeat(
master_info.network_address.hostname.c_str(),
master_info.network_address.port,
master_info.cluster_id);

// Check cluster id
if (_master_info->cluster_id == -1) {
OLAP_LOG_INFO("get first heartbeat. update cluster id");
Expand Down Expand Up @@ -97,7 +97,20 @@ void HeartbeatServer::heartbeat(
_epoch, master_info.epoch);
error_msgs.push_back("epoch is not greater than local. ignore heartbeat.");
status = PALO_ERROR;
}
}
}
}

if (status == PALO_SUCCESS && master_info.__isset.token) {
if (!_master_info->__isset.token) {
_master_info->__set_token(master_info.token);
OLAP_LOG_INFO("get token. token: %s", _master_info->token.c_str());
} else if (_master_info->token != master_info.token) {
OLAP_LOG_WARNING("invalid token. local_token:%s, token:%s",
_master_info->token.c_str(),
master_info.token.c_str());
error_msgs.push_back("invalid token.");
status = PALO_ERROR;
}
}

Expand Down
8 changes: 1 addition & 7 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,13 +997,7 @@ AgentStatus TaskWorkerPool::_clone_copy(
AgentStatus status = PALO_SUCCESS;


std::string token;
{
uint32_t cluster_id = OLAPRootPath::get_instance()->effective_cluster_id();
stringstream token_stream;
token_stream << cluster_id;
token = token_stream.str();
}
std::string token = _master_info.token;

for (auto src_backend : clone_req.src_backends) {
stringstream http_host_stream;
Expand Down
11 changes: 1 addition & 10 deletions be/src/http/download_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include "util/defer_op.h"
#include "util/file_utils.h"
#include "util/filesystem_util.h"
#include "util/string_parser.hpp"
#include "runtime/exec_env.h"

namespace palo {
Expand Down Expand Up @@ -230,15 +229,7 @@ Status DownloadAction::check_token(HttpRequest *req) {
return Status("token is not specified.");
}

StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
int32_t token = StringParser::string_to_int<int32_t>(
token_str.c_str(), token_str.size(), &parse_result);
if (parse_result != StringParser::PARSE_SUCCESS) {
return Status("token format is wrong.");
}

int32_t local_token = static_cast<int32_t>(_exec_env->cluster_id());
if (token != local_token) {
if (token_str != _exec_env->token()) {
return Status("invalid token.");
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/etl_job_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ std::string EtlJobMgr::to_http_path(const std::string& file_name) {
std::stringstream url;
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_download_load?"
<< "token=" << _exec_env->cluster_id()
<< "token=" << _exec_env->token()
<< "&file=" << file_name;
return url.str();
}
Expand Down
12 changes: 8 additions & 4 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,6 @@ Status ExecEnv::start_services() {
return Status::OK;
}

uint32_t ExecEnv::cluster_id() {
return OLAPRootPath::get_instance()->effective_cluster_id();
}

Status ExecEnv::start_webserver() {
add_default_path_handlers(_web_page_handler.get(), _mem_tracker.get());
_webserver->register_handler(HttpMethod::PUT,
Expand Down Expand Up @@ -225,4 +221,12 @@ Status ExecEnv::start_webserver() {
return Status::OK;
}

uint32_t ExecEnv::cluster_id() {
return OLAPRootPath::get_instance()->effective_cluster_id();
}

const std::string& ExecEnv::token() const {
return _master_info->token;
}

}
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class ExecEnv {

uint32_t cluster_id();

const std::string& token() const;

DataStreamMgr* stream_mgr() {
return _stream_mgr.get();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ std::string FragmentExecState::to_http_path(const std::string& file_name) {
std::stringstream url;
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_download_load?"
<< "token=" << _exec_env->cluster_id()
<< "token=" << _exec_env->token()
<< "&file=" << file_name;
return url.str();
}
Expand Down
91 changes: 75 additions & 16 deletions fe/src/com/baidu/palo/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import com.baidu.palo.ha.FrontendNodeType;
import com.baidu.palo.ha.HAProtocol;
import com.baidu.palo.ha.MasterInfo;
import com.baidu.palo.http.meta.MetaBaseAction;
import com.baidu.palo.journal.JournalCursor;
import com.baidu.palo.journal.JournalEntity;
import com.baidu.palo.journal.bdbje.Timestamp;
Expand Down Expand Up @@ -152,6 +153,7 @@
import com.google.common.base.Joiner;
import com.google.common.base.Joiner.MapJoiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -255,6 +257,7 @@ public class Catalog {
private String metaDir;
private EditLog editLog;
private int clusterId;
private String token;
// For checkpoint and observer memory replayed marker
private AtomicLong replayedJournalId;

Expand Down Expand Up @@ -437,7 +440,6 @@ private void writeUnlock() {
}

public void initialize(String[] args) throws Exception {

// 0. get local node and helper node info
getSelfHostPort();
checkArgs(args);
Expand Down Expand Up @@ -496,7 +498,8 @@ private void getClusterIdAndRole() throws IOException {
// first node to start
// or when one node to restart.
if (isMyself()) {
if (roleFile.exists() && !versionFile.exists() || !roleFile.exists() && versionFile.exists()) {
if ((roleFile.exists() && !versionFile.exists())
|| (!roleFile.exists() && versionFile.exists())) {
LOG.error("role file and version file must both exist or both not exist. "
+ "please specific one helper node to recover. will exit.");
System.exit(-1);
Expand All @@ -518,8 +521,10 @@ private void getClusterIdAndRole() throws IOException {

if (!versionFile.exists()) {
clusterId = Config.cluster_id == -1 ? Storage.newClusterID() : Config.cluster_id;
storage = new Storage(clusterId, IMAGE_DIR);
storage.writeClusterId();
token = Strings.isNullOrEmpty(Config.auth_token) ?
Storage.newToken() : Config.auth_token;
storage = new Storage(clusterId, token, IMAGE_DIR);
storage.writeClusterIdAndToken();
// If the version file and role file does not exist and the
// helper node is itself,
// it must be the very beginning startup of the cluster
Expand All @@ -534,6 +539,16 @@ private void getClusterIdAndRole() throws IOException {
}
} else {
clusterId = storage.getClusterID();
if (storage.getToken() == null) {
token = Strings.isNullOrEmpty(Config.auth_token) ?
Storage.newToken() : Config.auth_token;
LOG.info("new token={}", token);
storage.setToken(token);
storage.writeClusterIdAndToken();
} else {
token = storage.getToken();
}
isFirstTimeStartUp = false;
}
} else {
// Designate one helper node. Get the roll and version info
Expand Down Expand Up @@ -576,24 +591,44 @@ private void getClusterIdAndRole() throws IOException {
// so we new one.
storage = new Storage(IMAGE_DIR);
clusterId = storage.getClusterID();
token = storage.getToken();
if (Strings.isNullOrEmpty(token)) {
token = Config.auth_token;
}
} else {
// If the version file exist, read the cluster id and check the
// id with helper node to make sure they are identical
clusterId = storage.getClusterID();
token = storage.getToken();
try {
URL idURL = new URL("http://" + helperNode.first + ":" + Config.http_port + "/check");
HttpURLConnection conn = null;
conn = (HttpURLConnection) idURL.openConnection();
conn.setConnectTimeout(2 * 1000);
conn.setReadTimeout(2 * 1000);
String clusterIdString = conn.getHeaderField("cluster_id");
String clusterIdString = conn.getHeaderField(MetaBaseAction.CLUSTER_ID);
int remoteClusterId = Integer.parseInt(clusterIdString);
if (remoteClusterId != clusterId) {
LOG.error("cluster id not equal with node {}. will exit.", helperNode.first);
LOG.error("cluster id is not equal with helper node {}. will exit.", helperNode.first);
System.exit(-1);
}
String remoteToken = conn.getHeaderField(MetaBaseAction.TOKEN);
if (token == null && remoteToken != null) {
LOG.info("get token from helper node. token={}.", remoteToken);
token = remoteToken;
storage.writeClusterIdAndToken();
storage.reload();
}
if (Config.enable_token_check) {
Preconditions.checkNotNull(token);
Preconditions.checkNotNull(remoteToken);
if (!token.equals(remoteToken)) {
LOG.error("token is not equal with helper node {}. will exit.", helperNode.first);
System.exit(-1);
}
}
} catch (Exception e) {
LOG.warn("fail to check cluster_id from helper node.", e);
LOG.warn("fail to check cluster_id and token with helper node.", e);
System.exit(-1);
}
}
Expand Down Expand Up @@ -710,7 +745,8 @@ private void transferToMaster() throws IOException {
LOG.info("checkpointer thread started. thread id is {}", checkpointThreadId);

// ClusterInfoService
Catalog.getCurrentSystemInfo().setMaster(FrontendOptions.getLocalHostAddress(), Config.rpc_port, clusterId, epoch);
Catalog.getCurrentSystemInfo().setMaster(
FrontendOptions.getLocalHostAddress(), Config.rpc_port, clusterId, token, epoch);
Catalog.getCurrentSystemInfo().start();

pullLoadJobMgr.start();
Expand Down Expand Up @@ -832,8 +868,7 @@ private void getNewImage() throws IOException {
long version = info.getImageSeq();
if (version > localImageVersion) {
String url = "http://" + helperNode.first + ":" + Config.http_port
+ "/image?version=" + version
+ "&token=" + clusterId;
+ "/image?version=" + version;
String filename = Storage.IMAGE + "." + version;
File dir = new File(IMAGE_DIR);
MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getOutputStream(filename, dir));
Expand Down Expand Up @@ -1858,19 +1893,43 @@ public void dropFrontend(FrontendNodeType role, String host, int port) throws Dd
}

public Frontend checkFeExist(String host, int port) {
for (Frontend fe : frontends) {
if (fe.getHost().equals(host) && fe.getPort() == port) {
return fe;
readLock();
try {
for (Frontend fe : frontends) {
if (fe.getHost().equals(host) && fe.getPort() == port) {
return fe;
}
}
} finally {
readUnlock();
}
return null;
}

public Frontend checkFeRemoved(String host, int port) {
for (Frontend fe : removedFrontends) {
if (fe.getHost().equals(host) && fe.getPort() == port) {
return fe;
readLock();
try {
for (Frontend fe : removedFrontends) {
if (fe.getHost().equals(host) && fe.getPort() == port) {
return fe;
}
}
} finally {
readUnlock();
}
return null;
}

public Frontend getFeByHost(String host) {
readLock();
try {
for (Frontend fe : frontends) {
if (fe.getHost().equals(host)) {
return fe;
}
}
} finally {
readUnlock();
}
return null;
}
Expand Down
4 changes: 4 additions & 0 deletions fe/src/com/baidu/palo/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public class Config extends ConfigBase {
* You can also sepecify one.
*/
@ConfField public static int cluster_id = -1;
/*
* Cluster token used for internal authentication.
*/
@ConfField public static String auth_token = "";

// Configurations for load, clone, create table, alter table etc. We will rarely change them
/*
Expand Down
Loading