From 955be2526a5dea58627a3b81dcd8b7deea1af927 Mon Sep 17 00:00:00 2001 From: lingbin Date: Wed, 20 Sep 2017 20:35:23 +0800 Subject: [PATCH] make UUID to be authentication token --- be/src/agent/heartbeat_server.cpp | 17 +- be/src/agent/task_worker_pool.cpp | 8 +- be/src/http/download_action.cpp | 11 +- be/src/runtime/etl_job_mgr.cpp | 2 +- be/src/runtime/exec_env.cpp | 12 +- be/src/runtime/exec_env.h | 2 + be/src/runtime/fragment_mgr.cpp | 2 +- fe/src/com/baidu/palo/catalog/Catalog.java | 91 +- fe/src/com/baidu/palo/common/Config.java | 4 + fe/src/com/baidu/palo/http/HttpServer.java | 26 +- .../http/meta/InvalidClientException.java | 24 + .../baidu/palo/http/meta/MetaBaseAction.java | 57 +- .../com/baidu/palo/http/meta/MetaService.java | 59 +- fe/src/com/baidu/palo/persist/Storage.java | 101 +- .../palo/service/FrontendServiceImpl.java | 4 +- .../baidu/palo/system/SystemInfoService.java | 1637 ++++++++--------- .../com/baidu/palo/persist/StorageTest.java | 272 +-- gensrc/thrift/HeartbeatService.thrift | 1 + 18 files changed, 1212 insertions(+), 1118 deletions(-) create mode 100644 fe/src/com/baidu/palo/http/meta/InvalidClientException.java diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index e0f99cf941b8ce..1823b13c45cebc 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -52,7 +52,7 @@ void HeartbeatServer::heartbeat( master_info.network_address.hostname.c_str(), master_info.network_address.port, master_info.cluster_id); - + // Check cluster id if (_master_info->cluster_id == -1) { OLAP_LOG_INFO("get first heartbeat. update cluster id"); @@ -97,7 +97,20 @@ void HeartbeatServer::heartbeat( _epoch, master_info.epoch); error_msgs.push_back("epoch is not greater than local. ignore heartbeat."); status = PALO_ERROR; - } + } + } + } + + if (status == PALO_SUCCESS && master_info.__isset.token) { + if (!_master_info->__isset.token) { + _master_info->__set_token(master_info.token); + OLAP_LOG_INFO("get token. token: %s", _master_info->token.c_str()); + } else if (_master_info->token != master_info.token) { + OLAP_LOG_WARNING("invalid token. local_token:%s, token:%s", + _master_info->token.c_str(), + master_info.token.c_str()); + error_msgs.push_back("invalid token."); + status = PALO_ERROR; } } diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 7523750e50fd36..c5c6afa9e037ce 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -997,13 +997,7 @@ AgentStatus TaskWorkerPool::_clone_copy( AgentStatus status = PALO_SUCCESS; - std::string token; - { - uint32_t cluster_id = OLAPRootPath::get_instance()->effective_cluster_id(); - stringstream token_stream; - token_stream << cluster_id; - token = token_stream.str(); - } + std::string token = _master_info.token; for (auto src_backend : clone_req.src_backends) { stringstream http_host_stream; diff --git a/be/src/http/download_action.cpp b/be/src/http/download_action.cpp index 39eb29dacd148f..fe0b5822c49c17 100644 --- a/be/src/http/download_action.cpp +++ b/be/src/http/download_action.cpp @@ -29,7 +29,6 @@ #include "util/defer_op.h" #include "util/file_utils.h" #include "util/filesystem_util.h" -#include "util/string_parser.hpp" #include "runtime/exec_env.h" namespace palo { @@ -230,15 +229,7 @@ Status DownloadAction::check_token(HttpRequest *req) { return Status("token is not specified."); } - StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS; - int32_t token = StringParser::string_to_int( - token_str.c_str(), token_str.size(), &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS) { - return Status("token format is wrong."); - } - - int32_t local_token = static_cast(_exec_env->cluster_id()); - if (token != local_token) { + if (token_str != _exec_env->token()) { return Status("invalid token."); } diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp index e7fa33c769907d..7e35c8453b7e1c 100644 --- a/be/src/runtime/etl_job_mgr.cpp +++ b/be/src/runtime/etl_job_mgr.cpp @@ -40,7 +40,7 @@ std::string EtlJobMgr::to_http_path(const std::string& file_name) { std::stringstream url; url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port << "/api/_download_load?" - << "token=" << _exec_env->cluster_id() + << "token=" << _exec_env->token() << "&file=" << file_name; return url.str(); } diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 4a73516e06e40e..6533d55d255431 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -169,10 +169,6 @@ Status ExecEnv::start_services() { return Status::OK; } -uint32_t ExecEnv::cluster_id() { - return OLAPRootPath::get_instance()->effective_cluster_id(); -} - Status ExecEnv::start_webserver() { add_default_path_handlers(_web_page_handler.get(), _mem_tracker.get()); _webserver->register_handler(HttpMethod::PUT, @@ -225,4 +221,12 @@ Status ExecEnv::start_webserver() { return Status::OK; } +uint32_t ExecEnv::cluster_id() { + return OLAPRootPath::get_instance()->effective_cluster_id(); +} + +const std::string& ExecEnv::token() const { + return _master_info->token; +} + } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 82b1fcf782fb6a..7f4539e5b9720d 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -76,6 +76,8 @@ class ExecEnv { uint32_t cluster_id(); + const std::string& token() const; + DataStreamMgr* stream_mgr() { return _stream_mgr.get(); } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 97988ba18af3a7..57475efe8109ce 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -211,7 +211,7 @@ std::string FragmentExecState::to_http_path(const std::string& file_name) { std::stringstream url; url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port << "/api/_download_load?" - << "token=" << _exec_env->cluster_id() + << "token=" << _exec_env->token() << "&file=" << file_name; return url.str(); } diff --git a/fe/src/com/baidu/palo/catalog/Catalog.java b/fe/src/com/baidu/palo/catalog/Catalog.java index 990fdb75980f5c..26011785b83d36 100644 --- a/fe/src/com/baidu/palo/catalog/Catalog.java +++ b/fe/src/com/baidu/palo/catalog/Catalog.java @@ -102,6 +102,7 @@ import com.baidu.palo.ha.FrontendNodeType; import com.baidu.palo.ha.HAProtocol; import com.baidu.palo.ha.MasterInfo; +import com.baidu.palo.http.meta.MetaBaseAction; import com.baidu.palo.journal.JournalCursor; import com.baidu.palo.journal.JournalEntity; import com.baidu.palo.journal.bdbje.Timestamp; @@ -152,6 +153,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Joiner.MapJoiner; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -255,6 +257,7 @@ public class Catalog { private String metaDir; private EditLog editLog; private int clusterId; + private String token; // For checkpoint and observer memory replayed marker private AtomicLong replayedJournalId; @@ -437,7 +440,6 @@ private void writeUnlock() { } public void initialize(String[] args) throws Exception { - // 0. get local node and helper node info getSelfHostPort(); checkArgs(args); @@ -496,7 +498,8 @@ private void getClusterIdAndRole() throws IOException { // first node to start // or when one node to restart. if (isMyself()) { - if (roleFile.exists() && !versionFile.exists() || !roleFile.exists() && versionFile.exists()) { + if ((roleFile.exists() && !versionFile.exists()) + || (!roleFile.exists() && versionFile.exists())) { LOG.error("role file and version file must both exist or both not exist. " + "please specific one helper node to recover. will exit."); System.exit(-1); @@ -518,8 +521,10 @@ private void getClusterIdAndRole() throws IOException { if (!versionFile.exists()) { clusterId = Config.cluster_id == -1 ? Storage.newClusterID() : Config.cluster_id; - storage = new Storage(clusterId, IMAGE_DIR); - storage.writeClusterId(); + token = Strings.isNullOrEmpty(Config.auth_token) ? + Storage.newToken() : Config.auth_token; + storage = new Storage(clusterId, token, IMAGE_DIR); + storage.writeClusterIdAndToken(); // If the version file and role file does not exist and the // helper node is itself, // it must be the very beginning startup of the cluster @@ -534,6 +539,16 @@ private void getClusterIdAndRole() throws IOException { } } else { clusterId = storage.getClusterID(); + if (storage.getToken() == null) { + token = Strings.isNullOrEmpty(Config.auth_token) ? + Storage.newToken() : Config.auth_token; + LOG.info("new token={}", token); + storage.setToken(token); + storage.writeClusterIdAndToken(); + } else { + token = storage.getToken(); + } + isFirstTimeStartUp = false; } } else { // Designate one helper node. Get the roll and version info @@ -576,24 +591,44 @@ private void getClusterIdAndRole() throws IOException { // so we new one. storage = new Storage(IMAGE_DIR); clusterId = storage.getClusterID(); + token = storage.getToken(); + if (Strings.isNullOrEmpty(token)) { + token = Config.auth_token; + } } else { // If the version file exist, read the cluster id and check the // id with helper node to make sure they are identical clusterId = storage.getClusterID(); + token = storage.getToken(); try { URL idURL = new URL("http://" + helperNode.first + ":" + Config.http_port + "/check"); HttpURLConnection conn = null; conn = (HttpURLConnection) idURL.openConnection(); conn.setConnectTimeout(2 * 1000); conn.setReadTimeout(2 * 1000); - String clusterIdString = conn.getHeaderField("cluster_id"); + String clusterIdString = conn.getHeaderField(MetaBaseAction.CLUSTER_ID); int remoteClusterId = Integer.parseInt(clusterIdString); if (remoteClusterId != clusterId) { - LOG.error("cluster id not equal with node {}. will exit.", helperNode.first); + LOG.error("cluster id is not equal with helper node {}. will exit.", helperNode.first); System.exit(-1); } + String remoteToken = conn.getHeaderField(MetaBaseAction.TOKEN); + if (token == null && remoteToken != null) { + LOG.info("get token from helper node. token={}.", remoteToken); + token = remoteToken; + storage.writeClusterIdAndToken(); + storage.reload(); + } + if (Config.enable_token_check) { + Preconditions.checkNotNull(token); + Preconditions.checkNotNull(remoteToken); + if (!token.equals(remoteToken)) { + LOG.error("token is not equal with helper node {}. will exit.", helperNode.first); + System.exit(-1); + } + } } catch (Exception e) { - LOG.warn("fail to check cluster_id from helper node.", e); + LOG.warn("fail to check cluster_id and token with helper node.", e); System.exit(-1); } } @@ -710,7 +745,8 @@ private void transferToMaster() throws IOException { LOG.info("checkpointer thread started. thread id is {}", checkpointThreadId); // ClusterInfoService - Catalog.getCurrentSystemInfo().setMaster(FrontendOptions.getLocalHostAddress(), Config.rpc_port, clusterId, epoch); + Catalog.getCurrentSystemInfo().setMaster( + FrontendOptions.getLocalHostAddress(), Config.rpc_port, clusterId, token, epoch); Catalog.getCurrentSystemInfo().start(); pullLoadJobMgr.start(); @@ -832,8 +868,7 @@ private void getNewImage() throws IOException { long version = info.getImageSeq(); if (version > localImageVersion) { String url = "http://" + helperNode.first + ":" + Config.http_port - + "/image?version=" + version - + "&token=" + clusterId; + + "/image?version=" + version; String filename = Storage.IMAGE + "." + version; File dir = new File(IMAGE_DIR); MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getOutputStream(filename, dir)); @@ -1858,19 +1893,43 @@ public void dropFrontend(FrontendNodeType role, String host, int port) throws Dd } public Frontend checkFeExist(String host, int port) { - for (Frontend fe : frontends) { - if (fe.getHost().equals(host) && fe.getPort() == port) { - return fe; + readLock(); + try { + for (Frontend fe : frontends) { + if (fe.getHost().equals(host) && fe.getPort() == port) { + return fe; + } } + } finally { + readUnlock(); } return null; } public Frontend checkFeRemoved(String host, int port) { - for (Frontend fe : removedFrontends) { - if (fe.getHost().equals(host) && fe.getPort() == port) { - return fe; + readLock(); + try { + for (Frontend fe : removedFrontends) { + if (fe.getHost().equals(host) && fe.getPort() == port) { + return fe; + } + } + } finally { + readUnlock(); + } + return null; + } + + public Frontend getFeByHost(String host) { + readLock(); + try { + for (Frontend fe : frontends) { + if (fe.getHost().equals(host)) { + return fe; + } } + } finally { + readUnlock(); } return null; } diff --git a/fe/src/com/baidu/palo/common/Config.java b/fe/src/com/baidu/palo/common/Config.java index 115714b62e2192..6f6da33ee59b38 100644 --- a/fe/src/com/baidu/palo/common/Config.java +++ b/fe/src/com/baidu/palo/common/Config.java @@ -175,6 +175,10 @@ public class Config extends ConfigBase { * You can also sepecify one. */ @ConfField public static int cluster_id = -1; + /* + * Cluster token used for internal authentication. + */ + @ConfField public static String auth_token = ""; // Configurations for load, clone, create table, alter table etc. We will rarely change them /* diff --git a/fe/src/com/baidu/palo/http/HttpServer.java b/fe/src/com/baidu/palo/http/HttpServer.java index bfe4adb5ad27a2..a4e1f02dbcd047 100755 --- a/fe/src/com/baidu/palo/http/HttpServer.java +++ b/fe/src/com/baidu/palo/http/HttpServer.java @@ -28,7 +28,6 @@ import com.baidu.palo.http.action.VariableAction; import com.baidu.palo.http.meta.MetaService.CheckAction; import com.baidu.palo.http.meta.MetaService.DumpAction; -import com.baidu.palo.http.meta.MetaService.EditsAction; import com.baidu.palo.http.meta.MetaService.ImageAction; import com.baidu.palo.http.meta.MetaService.InfoAction; import com.baidu.palo.http.meta.MetaService.JournalIdAction; @@ -63,6 +62,9 @@ import com.baidu.palo.master.MetaHelper; import com.baidu.palo.qe.QeService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -74,9 +76,6 @@ import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.io.File; public class HttpServer { @@ -85,9 +84,9 @@ public class HttpServer { private QeService qeService = null; private int port; private ActionController controller; - + private Thread serverThread; - + public HttpServer(QeService qeService, int port) { this.qeService = qeService; this.port = port; @@ -97,7 +96,7 @@ public HttpServer(QeService qeService, int port) { public void setup() throws IllegalArgException { registerActions(); } - + private void registerActions() throws IllegalArgException { // add rest action LoadAction.registerAction(controller); @@ -106,7 +105,7 @@ private void registerActions() throws IllegalArgException { SetConfigAction.registerAction(controller); GetDdlStmtAction.registerAction(controller); MigrationAction.registerAction(controller); - + // add web action IndexAction.registerAction(controller); SystemAction.registerAction(controller); @@ -142,7 +141,6 @@ private void registerActions() throws IllegalArgException { // meta service action File imageDir = MetaHelper.getMasterImageDir(); ImageAction.registerAction(controller, imageDir); - EditsAction.registerAction(controller, imageDir); InfoAction.registerAction(controller, imageDir); VersionAction.registerAction(controller, imageDir); PutAction.registerAction(controller, imageDir); @@ -158,12 +156,12 @@ private void registerActions() throws IllegalArgException { BootstrapFinishAction.registerAction(controller); } - + public void start() { serverThread = new Thread(new HttpServerThread(), "FE Http Server"); serverThread.start(); } - + protected class PaloHttpServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel ch) throws Exception { @@ -173,7 +171,7 @@ protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HttpServerHandler(controller, qeService)); } } - + private class HttpServerThread implements Runnable { @Override public void run() { @@ -197,7 +195,7 @@ public void run() { } } } - + public static void main(String[] args) throws Exception { QeService qeService = new QeService(9030); HttpServer httpServer = new HttpServer(qeService, 8080); @@ -205,7 +203,7 @@ public static void main(String[] args) throws Exception { System.out.println("before start http server."); httpServer.start(); System.out.println("after start http server."); - + while (true) { Thread.sleep(2000); } diff --git a/fe/src/com/baidu/palo/http/meta/InvalidClientException.java b/fe/src/com/baidu/palo/http/meta/InvalidClientException.java new file mode 100644 index 00000000000000..2445aa964c6210 --- /dev/null +++ b/fe/src/com/baidu/palo/http/meta/InvalidClientException.java @@ -0,0 +1,24 @@ +// Copyright (c) 2017, Baidu.com, Inc. All Rights Reserved + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.baidu.palo.http.meta; + +import com.baidu.palo.common.DdlException; + +public class InvalidClientException extends DdlException { + public InvalidClientException(String msg) { + super(msg); + } +} diff --git a/fe/src/com/baidu/palo/http/meta/MetaBaseAction.java b/fe/src/com/baidu/palo/http/meta/MetaBaseAction.java index 8bd7138f15e2c4..42a882e3814b89 100644 --- a/fe/src/com/baidu/palo/http/meta/MetaBaseAction.java +++ b/fe/src/com/baidu/palo/http/meta/MetaBaseAction.java @@ -15,43 +15,88 @@ package com.baidu.palo.http.meta; +import com.baidu.palo.catalog.Catalog; import com.baidu.palo.http.ActionController; import com.baidu.palo.http.BaseRequest; import com.baidu.palo.http.BaseResponse; import com.baidu.palo.http.action.WebBaseAction; import com.baidu.palo.master.MetaHelper; +import com.baidu.palo.system.Frontend; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import io.netty.handler.codec.http.HttpResponseStatus; import java.io.File; public class MetaBaseAction extends WebBaseAction { + private static final Logger LOG = LogManager.getLogger(MetaBaseAction.class); private static String CONTENT_DISPOSITION = "Content-disposition"; - + + public static final String CLUSTER_ID = "cluster_id"; + public static final String TOKEN = "token"; + protected File imageDir; - + public MetaBaseAction(ActionController controller, File imageDir) { super(controller); this.imageDir = imageDir; } - + @Override public boolean needAdmin() { return false; } - + + @Override + public void execute(BaseRequest request, BaseResponse response) { + if (needCheckClientIsFe()) { + try { + checkFromValidFe(request, response); + } catch (InvalidClientException e) { + response.appendContent("invalid client host."); + writeResponse(request, response, HttpResponseStatus.BAD_REQUEST); + return; + } + } else { + super.execute(request, response); + } + } + + protected boolean needCheckClientIsFe() { + return true; + } + protected void writeFileResponse(BaseRequest request, BaseResponse response, File file) { if (file == null || !file.exists()) { response.appendContent("File not exist."); writeResponse(request, response, HttpResponseStatus.NOT_FOUND); return; } - + // add customed header response.addHeader(CONTENT_DISPOSITION, "attachment; filename=" + file.getName()); response.addHeader(MetaHelper.X_IMAGE_SIZE, String.valueOf(file.length())); - + writeFileResponse(request, response, HttpResponseStatus.OK, file); return; } + + private boolean isFromValidFe(BaseRequest request) { + String clientHost = request.getHostString(); + Frontend fe = Catalog.getInstance().getFeByHost(clientHost); + if (fe == null) { + LOG.warn("request is not from valid FE . client: {}", clientHost); + return false; + } + return true; + } + + private void checkFromValidFe(BaseRequest request, BaseResponse response) + throws InvalidClientException { + if (isFromValidFe(request)) { + throw new InvalidClientException("invalid client host"); + } + } } diff --git a/fe/src/com/baidu/palo/http/meta/MetaService.java b/fe/src/com/baidu/palo/http/meta/MetaService.java index 2cb5c3ae53fdc8..c9e265deda2d02 100644 --- a/fe/src/com/baidu/palo/http/meta/MetaService.java +++ b/fe/src/com/baidu/palo/http/meta/MetaService.java @@ -53,7 +53,6 @@ public class MetaService { public static class ImageAction extends MetaBaseAction { private static final String VERSION = "version"; - private static final String TOKEN = "token"; public ImageAction(ActionController controller, File imageDir) { super(controller, imageDir); @@ -79,21 +78,6 @@ public void executeGet(BaseRequest request, BaseResponse response) { return; } - if (Config.enable_token_check) { - String tokenStr = request.getSingleParameter(TOKEN); - if (Strings.isNullOrEmpty(tokenStr)) { - response.appendContent("Miss token parameter"); - writeResponse(request, response, HttpResponseStatus.BAD_REQUEST); - return; - } - int token = checkIntParam(tokenStr); - if (token != catalog.getClusterId()) { - response.appendContent("wrong token."); - writeResponse(request, response, HttpResponseStatus.BAD_REQUEST); - return; - } - } - File imageFile = Storage.getImageFile(imageDir, version); if (!imageFile.exists()) { writeResponse(request, response, HttpResponseStatus.NOT_FOUND); @@ -104,34 +88,6 @@ public void executeGet(BaseRequest request, BaseResponse response) { } } - public static class EditsAction extends MetaBaseAction { - private static final String SEQ = "seq"; - - public EditsAction(ActionController controller, File imageDir) { - super(controller, imageDir); - } - - public static void registerAction(ActionController controller, File imageDir) - throws IllegalArgException { - controller.registerHandler(HttpMethod.GET, "/edits", new EditsAction(controller, imageDir)); - } - - @Override - public void executeGet(BaseRequest request, BaseResponse response) { - String strSeq = request.getSingleParameter(SEQ); - File edits = null; - - if (Strings.isNullOrEmpty(strSeq)) { - long seq = Long.parseLong(strSeq); - edits = Storage.getEditsFile(imageDir, seq); - } else { - edits = Storage.getCurrentEditsFile(imageDir); - } - - writeFileResponse(request, response, edits); - } - } - public static class InfoAction extends MetaBaseAction { private static final Logger LOG = LogManager.getLogger(InfoAction.class); @@ -158,7 +114,7 @@ public void executeGet(BaseRequest request, BaseResponse response) { return; } catch (IOException e) { LOG.warn("IO error.", e); - response.appendContent("Miss version parameter"); + response.appendContent("failed to get master info."); writeResponse(request, response, HttpResponseStatus.BAD_REQUEST); return; } @@ -166,6 +122,7 @@ public void executeGet(BaseRequest request, BaseResponse response) { } public static class VersionAction extends MetaBaseAction { + private static final Logger LOG = LogManager.getLogger(VersionAction.class); public VersionAction(ActionController controller, File imageDir) { super(controller, imageDir); @@ -225,8 +182,7 @@ public void executeGet(BaseRequest request, BaseResponse response) { checkLongParam(versionStr); String url = "http://" + machine + ":" + portStr - + "/image?version=" + versionStr - + "&token=" + catalog.getClusterId(); + + "/image?version=" + versionStr; String filename = Storage.IMAGE + "." + versionStr; File dir = new File(Catalog.IMAGE_DIR); @@ -332,9 +288,9 @@ public static void registerAction (ActionController controller, File imageDir) public void executeGet(BaseRequest request, BaseResponse response) { try { Storage storage = new Storage(imageDir.getAbsolutePath()); - response.addHeader("cluster_id", Integer.toString(storage.getClusterID())); + response.addHeader(MetaBaseAction.CLUSTER_ID, Integer.toString(storage.getClusterID())); + response.addHeader(MetaBaseAction.TOKEN, storage.getToken()); } catch (IOException e) { - e.printStackTrace(); LOG.error(e); } writeResponse(request, response); @@ -358,6 +314,11 @@ public boolean needAdmin() { return true; } + @Override + protected boolean needCheckClientIsFe() { + return false; + } + @Override public void executeGet(BaseRequest request, BaseResponse response) { /* diff --git a/fe/src/com/baidu/palo/persist/Storage.java b/fe/src/com/baidu/palo/persist/Storage.java index a558a4d1429e3e..393b185b5a45e6 100644 --- a/fe/src/com/baidu/palo/persist/Storage.java +++ b/fe/src/com/baidu/palo/persist/Storage.java @@ -15,11 +15,14 @@ package com.baidu.palo.persist; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - import com.baidu.palo.ha.FrontendNodeType; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -30,11 +33,13 @@ import java.util.List; import java.util.Properties; import java.util.Random; +import java.util.UUID; public class Storage { private static final Logger LOG = LogManager.getLogger(Storage.class); public static final String CLUSTER_ID = "clusterId"; + public static final String TOKEN = "token"; public static final String FRONTEND_ROLE = "role"; public static final String EDITS = "edits"; public static final String IMAGE = "image"; @@ -42,29 +47,41 @@ public class Storage { public static final String VERSION_FILE = "VERSION"; public static final String ROLE_FILE = "ROLE"; - private int clusterID; + private int clusterID = 0; + private String token; private FrontendNodeType role = FrontendNodeType.UNKNOWN; private long editsSeq; private long imageSeq; private String metaDir; private List editsFileSequenceNumbers; - - public Storage(int clusterID, String metaDir) { + + public Storage(int clusterID, String token, String metaDir) { this.clusterID = clusterID; + this.token = token; this.metaDir = metaDir; } - public Storage(int clusterID, long imageSeq, long editsSeq, String metaDir) { + public Storage(int clusterID, String token, long imageSeq, long editsSeq, String metaDir) { this.clusterID = clusterID; + this.token = token; this.editsSeq = editsSeq; this.imageSeq = imageSeq; this.metaDir = metaDir; } - + public Storage(String metaDir) throws IOException { this.editsFileSequenceNumbers = new ArrayList(); this.metaDir = metaDir; + reload(); + } + + public List getEditsFileSequenceNumbers() { + Collections.sort(editsFileSequenceNumbers); + return this.editsFileSequenceNumbers; + } + + public void reload() throws IOException { // Read version file info Properties prop = new Properties(); File versionFile = getVersionFile(); @@ -73,8 +90,11 @@ public Storage(String metaDir) throws IOException { prop.load(in); in.close(); clusterID = Integer.parseInt(prop.getProperty(CLUSTER_ID)); + if (prop.getProperty(TOKEN) != null) { + token = prop.getProperty(TOKEN); + } } - + File roleFile = getRoleFile(); if (roleFile.isFile()) { FileInputStream in = new FileInputStream(roleFile); @@ -92,7 +112,7 @@ public Storage(String metaDir) throws IOException { for (File child : children) { String name = child.getName(); try { - if (!name.equals(EDITS) && !name.equals(IMAGE_NEW) + if (!name.equals(EDITS) && !name.equals(IMAGE_NEW) && !name.endsWith(".part") && name.contains(".")) { if (name.startsWith(IMAGE)) { imageSeq = Math.max(Long.parseLong(name.substring(name.lastIndexOf('.') + 1)), imageSeq); @@ -107,11 +127,7 @@ public Storage(String metaDir) throws IOException { } } } - } - - public List getEditsFileSequenceNumbers() { - Collections.sort(editsFileSequenceNumbers); - return this.editsFileSequenceNumbers; + } public int getClusterID() { @@ -121,7 +137,15 @@ public int getClusterID() { public void setClusterID(int clusterID) { this.clusterID = clusterID; } - + + public String getToken() { + return token; + } + + public void setToken(String token) { + this.token = token; + } + public FrontendNodeType getRole() { return role; } @@ -161,43 +185,20 @@ public static int newClusterID() { return newID; } - public void read() throws IOException { - read(getVersionFile()); - } - - public void read(File from) throws IOException { - RandomAccessFile file = new RandomAccessFile(from, "rws"); - FileInputStream in = null; - - try { - in = new FileInputStream(file.getFD()); - file.seek(0); - Properties properties = new Properties(); - properties.load(in); - getFields(properties); - } finally { - if (in != null) { - in.close(); - } - file.close(); - } + public static String newToken() { + return UUID.randomUUID().toString(); } - public void getFields(Properties properties) throws IOException { - String tmpClusterID = properties.getProperty(CLUSTER_ID); + private void setFields(Properties properties) throws IOException { + Preconditions.checkState(clusterID > 0); + properties.setProperty(CLUSTER_ID, String.valueOf(clusterID)); - if (tmpClusterID == null) { - throw new IOException("File " + VERSION_FILE + " is invalid."); + if (!Strings.isNullOrEmpty(token)) { + properties.setProperty(TOKEN, token); } - - clusterID = Integer.parseInt(tmpClusterID); } - public void setFields(Properties properties) throws IOException { - properties.setProperty(CLUSTER_ID, String.valueOf(clusterID)); - } - - public void writeClusterId() throws IOException { + public void writeClusterIdAndToken() throws IOException { Properties properties = new Properties(); setFields(properties); @@ -216,7 +217,7 @@ public void writeClusterId() throws IOException { file.close(); } } - + public void writeFrontendRole(FrontendNodeType role) throws IOException { Properties properties = new Properties(); properties.setProperty(FRONTEND_ROLE, role.name()); @@ -277,7 +278,7 @@ public static File getImageFile(File dir, long version) { public File getVersionFile() { return new File(metaDir, VERSION_FILE); } - + public File getRoleFile() { return new File(metaDir, ROLE_FILE); } @@ -302,5 +303,5 @@ public static long getMetaSeq(File file) { String filename = file.getName(); return Long.parseLong(filename.substring(filename.lastIndexOf('.') + 1)); } - + } diff --git a/fe/src/com/baidu/palo/service/FrontendServiceImpl.java b/fe/src/com/baidu/palo/service/FrontendServiceImpl.java index d76bf30bd6c24d..f5cf4acbd8c248 100644 --- a/fe/src/com/baidu/palo/service/FrontendServiceImpl.java +++ b/fe/src/com/baidu/palo/service/FrontendServiceImpl.java @@ -407,9 +407,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { if (connectionContext != null) { TNetworkAddress clientAddress = connectionContext.getClient(); - Frontend fe = Catalog.getInstance().checkFeExist( - clientAddress.getHostname(), - clientAddress.getPort()); + Frontend fe = Catalog.getInstance().getFeByHost(clientAddress.getHostname()); if (fe == null) { LOG.warn("reject request from invalid host. client: {}", clientAddress); throw new TException("request from invalid host was rejected."); diff --git a/fe/src/com/baidu/palo/system/SystemInfoService.java b/fe/src/com/baidu/palo/system/SystemInfoService.java index 5fde8ccc7e8f98..7adc08fd2a0ba4 100644 --- a/fe/src/com/baidu/palo/system/SystemInfoService.java +++ b/fe/src/com/baidu/palo/system/SystemInfoService.java @@ -13,7 +13,7 @@ // specific language governing permissions and limitations // under the License. -package com.baidu.palo.system; +package com.baidu.palo.system; import com.baidu.palo.catalog.Catalog; import com.baidu.palo.catalog.Database; @@ -63,34 +63,34 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -public class SystemInfoService extends Daemon { - public static final String DEFAULT_CLUSTER = "default_cluster"; - private static final Logger LOG = LogManager.getLogger(SystemInfoService.class); - - private volatile AtomicReference> idToBackendRef; - private volatile AtomicReference> idToHeartbeatHandlerRef; - private volatile AtomicReference> idToReportVersionRef; // no - // need - // to - // persist - - private final ExecutorService executor; - - private final EventBus eventBus; - - private static volatile AtomicReference masterInfo = new AtomicReference(); - - // last backend id used by round robin for sequential choosing backends for - // tablet creation - private ConcurrentHashMap lastBackendIdForCreationMap; - // last backend id used by round robin for sequential choosing backends in - // other jobs - private ConcurrentHashMap lastBackendIdForOtherMap; - - private long lastBackendIdForCreation = -1; - private long lastBackendIdForOther = -1; +import java.util.concurrent.atomic.AtomicReference; + +public class SystemInfoService extends Daemon { + public static final String DEFAULT_CLUSTER = "default_cluster"; + private static final Logger LOG = LogManager.getLogger(SystemInfoService.class); + + private volatile AtomicReference> idToBackendRef; + private volatile AtomicReference> idToHeartbeatHandlerRef; + private volatile AtomicReference> idToReportVersionRef; // no + // need + // to + // persist + + private final ExecutorService executor; + + private final EventBus eventBus; + + private static volatile AtomicReference masterInfo = new AtomicReference(); + + // last backend id used by round robin for sequential choosing backends for + // tablet creation + private ConcurrentHashMap lastBackendIdForCreationMap; + // last backend id used by round robin for sequential choosing backends in + // other jobs + private ConcurrentHashMap lastBackendIdForOtherMap; + + private long lastBackendIdForCreation = -1; + private long lastBackendIdForOther = -1; // sort host backends list by num of backends, descending private static final Comparator> hostBackendsListComparator = new Comparator> (){ @@ -100,78 +100,79 @@ public int compare(List list1, List list2) { return -1; } else { return 1; - } - } + } + } }; - public SystemInfoService() { - super("cluster info service", FeConstants.heartbeat_interval_second * 1000); - idToBackendRef = new AtomicReference>(ImmutableMap. of()); - idToHeartbeatHandlerRef = new AtomicReference>( - ImmutableMap. of()); - idToReportVersionRef = new AtomicReference>( - ImmutableMap. of()); - - executor = Executors.newCachedThreadPool(); - - eventBus = new EventBus("backendEvent"); - - lastBackendIdForCreationMap = new ConcurrentHashMap(); - lastBackendIdForOtherMap = new ConcurrentHashMap(); - } - - public EventBus getEventBus() { - return this.eventBus; - } - - public void setMaster(String masterHost, int masterPort, int clusterId, long epoch) { - TMasterInfo tMasterInfo = new TMasterInfo(new TNetworkAddress(masterHost, masterPort), clusterId, epoch); - masterInfo.set(tMasterInfo); - } - - public void addBackends(List> hostPortPairs, boolean isFree) throws DdlException { - for (Pair pair : hostPortPairs) { - // check is already exist - if (getBackendWithHeartbeatPort(pair.first, pair.second) != null) { - throw new DdlException("Same backend already exists[" + pair.first + ":" + pair.second + "]"); - } - } - - for (Pair pair : hostPortPairs) { + public SystemInfoService() { + super("cluster info service", FeConstants.heartbeat_interval_second * 1000); + idToBackendRef = new AtomicReference>(ImmutableMap. of()); + idToHeartbeatHandlerRef = new AtomicReference>( + ImmutableMap. of()); + idToReportVersionRef = new AtomicReference>( + ImmutableMap. of()); + + executor = Executors.newCachedThreadPool(); + + eventBus = new EventBus("backendEvent"); + + lastBackendIdForCreationMap = new ConcurrentHashMap(); + lastBackendIdForOtherMap = new ConcurrentHashMap(); + } + + public EventBus getEventBus() { + return this.eventBus; + } + + public void setMaster(String masterHost, int masterPort, int clusterId, String token, long epoch) { + TMasterInfo tMasterInfo = new TMasterInfo(new TNetworkAddress(masterHost, masterPort), clusterId, epoch); + tMasterInfo.setToken(token); + masterInfo.set(tMasterInfo); + } + + public void addBackends(List> hostPortPairs, boolean isFree) throws DdlException { + for (Pair pair : hostPortPairs) { + // check is already exist + if (getBackendWithHeartbeatPort(pair.first, pair.second) != null) { + throw new DdlException("Same backend already exists[" + pair.first + ":" + pair.second + "]"); + } + } + + for (Pair pair : hostPortPairs) { addBackend(pair.first, pair.second, isFree); - } - } - - // for test - public void addBackend(Backend backend) { - Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - copiedBackends.put(backend.getId(), backend); - ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); - idToBackendRef.set(newIdToBackend); - } - - private void addBackend(String host, int heartbeatPort, boolean isFree) throws DdlException { - Backend newBackend = new Backend(Catalog.getInstance().getNextId(), host, heartbeatPort); - // update idToBackend - Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - copiedBackends.put(newBackend.getId(), newBackend); - ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); - idToBackendRef.set(newIdToBackend); - - // set new backend's report version as 0L - Map copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get()); - copiedReportVerions.put(newBackend.getId(), new AtomicLong(0L)); - ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions); - idToReportVersionRef.set(newIdToReportVersion); - - // update idToHeartbeatHandler - Map copiedHeartbeatHandlersMap = Maps.newHashMap(idToHeartbeatHandlerRef.get()); - TNetworkAddress tNetworkAddress = new TNetworkAddress(newBackend.getHost(), newBackend.getHeartbeatPort()); - HeartbeatHandler heartbeatHandler = new HeartbeatHandler(newBackend, tNetworkAddress); - copiedHeartbeatHandlersMap.put(newBackend.getId(), heartbeatHandler); - ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); - idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); + } + } + + // for test + public void addBackend(Backend backend) { + Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + copiedBackends.put(backend.getId(), backend); + ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); + idToBackendRef.set(newIdToBackend); + } + + private void addBackend(String host, int heartbeatPort, boolean isFree) throws DdlException { + Backend newBackend = new Backend(Catalog.getInstance().getNextId(), host, heartbeatPort); + // update idToBackend + Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + copiedBackends.put(newBackend.getId(), newBackend); + ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); + idToBackendRef.set(newIdToBackend); + + // set new backend's report version as 0L + Map copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get()); + copiedReportVerions.put(newBackend.getId(), new AtomicLong(0L)); + ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions); + idToReportVersionRef.set(newIdToReportVersion); + + // update idToHeartbeatHandler + Map copiedHeartbeatHandlersMap = Maps.newHashMap(idToHeartbeatHandlerRef.get()); + TNetworkAddress tNetworkAddress = new TNetworkAddress(newBackend.getHost(), newBackend.getHeartbeatPort()); + HeartbeatHandler heartbeatHandler = new HeartbeatHandler(newBackend, tNetworkAddress); + copiedHeartbeatHandlersMap.put(newBackend.getId(), heartbeatHandler); + ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); + idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); // to add be to DEFAULT_CLUSTER if (!isFree) { @@ -183,156 +184,156 @@ private void addBackend(String host, int heartbeatPort, boolean isFree) throws D } // log - Catalog.getInstance().getEditLog().logAddBackend(newBackend); - LOG.info("add backend[" + newBackend.getId() + ". " + newBackend.getHost() + ":" + newBackend.getHeartbeatPort() - + ":" + newBackend.getBePort() + ":" + newBackend.getBePort() + ":" + newBackend.getHttpPort() + "]"); - } - - public void checkBackendsExist(List> hostPortPairs) throws DdlException { - for (Pair pair : hostPortPairs) { - // check if exist - if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) { - throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]"); - } - } - } - - public void dropBackends(List> hostPortPairs) throws DdlException { - for (Pair pair : hostPortPairs) { - // check is already exist - if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) { - throw new DdlException("backend does not exists[" + pair.first + ":" + pair.second + "]"); - } - } - - for (Pair pair : hostPortPairs) { - dropBackend(pair.first, pair.second); - } - } - - public void dropBackend(long backendId) throws DdlException { - Backend backend = getBackend(backendId); - if (backend == null) { - throw new DdlException("Backend[" + backendId + "] does not exist"); - } - - dropBackend(backend.getHost(), backend.getHeartbeatPort()); - } - - private void dropBackend(String host, int heartbeatPort) throws DdlException { - if (getBackendWithHeartbeatPort(host, heartbeatPort) == null) { - throw new DdlException("backend does not exists[" + host + ":" + heartbeatPort + "]"); - } - - Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort); - - // publish - eventBus.post(new BackendEvent(BackendEventType.BACKEND_DROPPED, "backend has been dropped", - Long.valueOf(droppedBackend.getId()))); - - // update idToBackend - Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - copiedBackends.remove(droppedBackend.getId()); - ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); - idToBackendRef.set(newIdToBackend); - - // update idToReportVersion - Map copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get()); - copiedReportVerions.remove(droppedBackend.getId()); - ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions); - idToReportVersionRef.set(newIdToReportVersion); - - // update idToHeartbeatHandler - Map copiedHeartbeatHandlersMap = Maps.newHashMap(idToHeartbeatHandlerRef.get()); - copiedHeartbeatHandlersMap.remove(droppedBackend.getId()); - ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); - idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); + Catalog.getInstance().getEditLog().logAddBackend(newBackend); + LOG.info("add backend[" + newBackend.getId() + ". " + newBackend.getHost() + ":" + newBackend.getHeartbeatPort() + + ":" + newBackend.getBePort() + ":" + newBackend.getBePort() + ":" + newBackend.getHttpPort() + "]"); + } + + public void checkBackendsExist(List> hostPortPairs) throws DdlException { + for (Pair pair : hostPortPairs) { + // check if exist + if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) { + throw new DdlException("Backend does not exist[" + pair.first + ":" + pair.second + "]"); + } + } + } + + public void dropBackends(List> hostPortPairs) throws DdlException { + for (Pair pair : hostPortPairs) { + // check is already exist + if (getBackendWithHeartbeatPort(pair.first, pair.second) == null) { + throw new DdlException("backend does not exists[" + pair.first + ":" + pair.second + "]"); + } + } + + for (Pair pair : hostPortPairs) { + dropBackend(pair.first, pair.second); + } + } + + public void dropBackend(long backendId) throws DdlException { + Backend backend = getBackend(backendId); + if (backend == null) { + throw new DdlException("Backend[" + backendId + "] does not exist"); + } + + dropBackend(backend.getHost(), backend.getHeartbeatPort()); + } + + private void dropBackend(String host, int heartbeatPort) throws DdlException { + if (getBackendWithHeartbeatPort(host, heartbeatPort) == null) { + throw new DdlException("backend does not exists[" + host + ":" + heartbeatPort + "]"); + } + + Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort); + + // publish + eventBus.post(new BackendEvent(BackendEventType.BACKEND_DROPPED, "backend has been dropped", + Long.valueOf(droppedBackend.getId()))); + + // update idToBackend + Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + copiedBackends.remove(droppedBackend.getId()); + ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); + idToBackendRef.set(newIdToBackend); + + // update idToReportVersion + Map copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get()); + copiedReportVerions.remove(droppedBackend.getId()); + ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions); + idToReportVersionRef.set(newIdToReportVersion); + + // update idToHeartbeatHandler + Map copiedHeartbeatHandlersMap = Maps.newHashMap(idToHeartbeatHandlerRef.get()); + copiedHeartbeatHandlersMap.remove(droppedBackend.getId()); + ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); + idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); // update cluster final Cluster cluster = Catalog.getInstance().getCluster(droppedBackend.getOwnerClusterName()); if (null != cluster) { cluster.removeBackend(droppedBackend.getId()); } else { - LOG.error("Cluster " + droppedBackend.getOwnerClusterName() + " no exist."); - } - // log - Catalog.getInstance().getEditLog().logDropBackend(droppedBackend); - LOG.info("drop {}", droppedBackend); - } - - // only for test - public void dropAllBackend() { - // update idToBackend - idToBackendRef.set(ImmutableMap. of()); - - // update idToReportVersion - idToReportVersionRef.set(ImmutableMap. of()); - - // update idToHeartbeatHandler - idToHeartbeatHandlerRef.set(ImmutableMap. of()); - } - - public Backend getBackend(long backendId) { - return idToBackendRef.get().get(backendId); - } - - public boolean checkBackendAvailable(long backendId) { - Backend backend = idToBackendRef.get().get(backendId); - if (backend == null || !backend.isAlive() || backend.isDecommissioned()) { - return false; - } - return true; - } - - public Backend getBackendWithHeartbeatPort(String host, int heartPort) { - ImmutableMap idToBackend = idToBackendRef.get(); - for (Backend backend : idToBackend.values()) { - if (backend.getHost().equals(host) && backend.getHeartbeatPort() == heartPort) { - return backend; - } - } - return null; - } - - public Backend getBackendWithBePort(String host, int bePort) { - ImmutableMap idToBackend = idToBackendRef.get(); - for (Backend backend : idToBackend.values()) { - if (backend.getHost().equals(host) && backend.getBePort() == bePort) { - return backend; - } - } - return null; - } - - public List getBackendIds(boolean needAlive) { - ImmutableMap idToBackend = idToBackendRef.get(); - List backendIds = Lists.newArrayList(idToBackend.keySet()); - if (!needAlive) { - return backendIds; - } else { - Iterator iter = backendIds.iterator(); - while (iter.hasNext()) { - Backend backend = this.getBackend(iter.next()); - if (backend == null || !backend.isAlive()) { - iter.remove(); - } - } - return backendIds; - } - } - + LOG.error("Cluster " + droppedBackend.getOwnerClusterName() + " no exist."); + } + // log + Catalog.getInstance().getEditLog().logDropBackend(droppedBackend); + LOG.info("drop {}", droppedBackend); + } + + // only for test + public void dropAllBackend() { + // update idToBackend + idToBackendRef.set(ImmutableMap. of()); + + // update idToReportVersion + idToReportVersionRef.set(ImmutableMap. of()); + + // update idToHeartbeatHandler + idToHeartbeatHandlerRef.set(ImmutableMap. of()); + } + + public Backend getBackend(long backendId) { + return idToBackendRef.get().get(backendId); + } + + public boolean checkBackendAvailable(long backendId) { + Backend backend = idToBackendRef.get().get(backendId); + if (backend == null || !backend.isAlive() || backend.isDecommissioned()) { + return false; + } + return true; + } + + public Backend getBackendWithHeartbeatPort(String host, int heartPort) { + ImmutableMap idToBackend = idToBackendRef.get(); + for (Backend backend : idToBackend.values()) { + if (backend.getHost().equals(host) && backend.getHeartbeatPort() == heartPort) { + return backend; + } + } + return null; + } + + public Backend getBackendWithBePort(String host, int bePort) { + ImmutableMap idToBackend = idToBackendRef.get(); + for (Backend backend : idToBackend.values()) { + if (backend.getHost().equals(host) && backend.getBePort() == bePort) { + return backend; + } + } + return null; + } + + public List getBackendIds(boolean needAlive) { + ImmutableMap idToBackend = idToBackendRef.get(); + List backendIds = Lists.newArrayList(idToBackend.keySet()); + if (!needAlive) { + return backendIds; + } else { + Iterator iter = backendIds.iterator(); + while (iter.hasNext()) { + Backend backend = this.getBackend(iter.next()); + if (backend == null || !backend.isAlive()) { + iter.remove(); + } + } + return backendIds; + } + } + /** * choose backends to create cluster - * + * * @param clusterName * @param instanceNum * @return - */ - public List createCluster(String clusterName, int instanceNum) { - final List chosenBackendIds = Lists.newArrayList(); + */ + public List createCluster(String clusterName, int instanceNum) { + final List chosenBackendIds = Lists.newArrayList(); final Map> hostBackendsMap = getHostBackendsMap(true /* need alive*/, true /* need free */, - false /* can not be in decommission*/); + false /* can not be in decommission*/); LOG.info("begin to create cluster {} with instance num: {}", clusterName, instanceNum); int availableBackendsCount = 0; @@ -342,17 +343,17 @@ public List createCluster(String clusterName, int instanceNum) { availableBackendsCount += list.size(); hostBackendsList.add(list); } - + if (instanceNum > availableBackendsCount) { LOG.warn("not enough available backends. requires :" + instanceNum + ", available:" + availableBackendsCount); return null; } - - // sort by number of backend in host + + // sort by number of backend in host Collections.sort(hostBackendsList, hostBackendsListComparator); - // hostIsEmpty is used to mark if host is empty, so avoid + // hostIsEmpty is used to mark if host is empty, so avoid // iterating hostIsEmpty with numOfHost in every circle. boolean[] hostIsEmpty = new boolean[hostBackendsList.size()]; for (int i = 0; i < hostBackendsList.size(); i++) { @@ -367,7 +368,7 @@ public List createCluster(String clusterName, int instanceNum) { // avoid counting repeatedly if (hostIsEmpty[i] == false) { hostIsEmpty[i] = true; - numOfHost--; + numOfHost--; } } if (chosenBackendIds.size() == instanceNum || numOfHost == 0) { @@ -377,61 +378,59 @@ public List createCluster(String clusterName, int instanceNum) { if (chosenBackendIds.size() != instanceNum) { LOG.warn("not enough available backends. require :" + instanceNum + " get:" + chosenBackendIds.size()); - return null; - } - - lastBackendIdForCreationMap.put(clusterName, (long) -1); - lastBackendIdForOtherMap.put(clusterName, (long) -1); - return chosenBackendIds; - } - - - /** - * remove backends in cluster - * - * @param backendList - * @throws DdlException - */ - public void releaseBackends(String clusterName, boolean isReplay) { - ImmutableMap idToBackend = idToBackendRef.get(); - final List backendIds = getClusterBackendIds(clusterName); - final Iterator iterator = backendIds.iterator(); - - while (iterator.hasNext()) { - final Long id = iterator.next(); + return null; + } + + lastBackendIdForCreationMap.put(clusterName, (long) -1); + lastBackendIdForOtherMap.put(clusterName, (long) -1); + return chosenBackendIds; + } + + + /** + * remove backends in cluster + * + * @throws DdlException + */ + public void releaseBackends(String clusterName, boolean isReplay) { + ImmutableMap idToBackend = idToBackendRef.get(); + final List backendIds = getClusterBackendIds(clusterName); + final Iterator iterator = backendIds.iterator(); + + while (iterator.hasNext()) { + final Long id = iterator.next(); if (!idToBackend.containsKey(id)) { - LOG.warn("cluster {} contain backend {} that does't exist", clusterName, id); - } else { - final Backend backend = idToBackend.get(id); - backend.setBackendState(BackendState.free); - backend.clearClusterName(); - if (!isReplay) { - Catalog.getInstance().getEditLog().logBackendStateChange(backend); - } - } - } - - lastBackendIdForCreationMap.remove(clusterName); - lastBackendIdForOtherMap.remove(clusterName); - } - + LOG.warn("cluster {} contain backend {} that does't exist", clusterName, id); + } else { + final Backend backend = idToBackend.get(id); + backend.setBackendState(BackendState.free); + backend.clearClusterName(); + if (!isReplay) { + Catalog.getInstance().getEditLog().logBackendStateChange(backend); + } + } + } + + lastBackendIdForCreationMap.remove(clusterName); + lastBackendIdForOtherMap.remove(clusterName); + } + /** * select host where has least free backends , be's state become free when decommission finish - * - * @param backendList + * * @param shrinkNum * @return - */ + */ public List calculateDecommissionBackends(String clusterName, int shrinkNum) { LOG.info("calculate decommission backend in cluster: {}. decommission num:", clusterName, shrinkNum); - - final List decomBackendIds = Lists.newArrayList(); - ImmutableMap idToBackends = idToBackendRef.get(); + + final List decomBackendIds = Lists.newArrayList(); + ImmutableMap idToBackends = idToBackendRef.get(); final List clusterBackends = getClusterBackendIds(clusterName); - // host -> backends of this cluster + // host -> backends of this cluster final Map> hostBackendsMapInCluster = Maps.newHashMap(); - // put backend in same host in list + // put backend in same host in list for (Long id : clusterBackends) { final Backend backend = idToBackends.get(id); if (hostBackendsMapInCluster.containsKey(backend.getHost())) { @@ -453,7 +452,7 @@ public List calculateDecommissionBackends(String clusterName, int shrinkNu // in each cycle, choose one backend from the host which has maximal backends num. // break if all hosts are empty or get enough backends. while (true) { - if (hostList.get(0).size() > 0) { + if (hostList.get(0).size() > 0) { decomBackendIds.add(hostList.get(0).remove(0).getId()); if (decomBackendIds.size() == shrinkNum) { // enough @@ -463,39 +462,39 @@ public List calculateDecommissionBackends(String clusterName, int shrinkNu } else { // all hosts empty break; - } + } } if (decomBackendIds.size() != shrinkNum) { LOG.info("failed to get enough backends to shrink in cluster: {}. required: {}, get: {}", - shrinkNum, decomBackendIds.size()); - return null; - } - - return decomBackendIds; - } - + shrinkNum, decomBackendIds.size()); + return null; + } + + return decomBackendIds; + } + /** * to expand backends in cluster. * firstly, acquire backends from hosts not in this cluster. * if not enough, secondly acquire backends from hosts in this cluster, returns a list of hosts - * sorted by the descending order of the number of backend in the first two ways, + * sorted by the descending order of the number of backend in the first two ways, * and get backends from the list in cycle. - * + * * @param clusterName * @param expansionNum * @return - */ + */ public List calculateExpansionBackends(String clusterName, int expansionNum) { LOG.debug("calculate expansion backend in cluster: {}, new instance num: {}", clusterName, expansionNum); - - final List chosenBackendIds = Lists.newArrayList(); - ImmutableMap idToBackends = idToBackendRef.get(); + + final List chosenBackendIds = Lists.newArrayList(); + ImmutableMap idToBackends = idToBackendRef.get(); // host -> backends final Map> hostBackendsMap = getHostBackendsMap(true /* need alive*/, true /* need free */, - false /* can not be in decommission */); - final List clusterBackends = getClusterBackendIds(clusterName); + false /* can not be in decommission */); + final List clusterBackends = getClusterBackendIds(clusterName); // hosts not in cluster List> hostsNotInCluster = Lists.newArrayList(); @@ -518,7 +517,7 @@ public List calculateExpansionBackends(String clusterName, int expansionNu hostsNotInCluster.add(entry.getValue()); } } - + if (expansionNum > availableBackendsCount) { LOG.info("not enough available backends. requires :" + expansionNum + ", available:" + availableBackendsCount); @@ -530,21 +529,21 @@ public List calculateExpansionBackends(String clusterName, int expansionNu // first select backends which belong to the hosts NOT IN this cluster if (hostsNotInCluster.size() > 0) { - // hostIsEmpty is userd to mark if host is empty, so - // avoid iterating hostIsEmpty with numOfHost in every circle + // hostIsEmpty is userd to mark if host is empty, so + // avoid iterating hostIsEmpty with numOfHost in every circle boolean[] hostIsEmpty = new boolean[hostsNotInCluster.size()]; for (int i = 0; i < hostsNotInCluster.size(); i++) { hostIsEmpty[i] = false; } - int numOfHost = hostsNotInCluster.size(); + int numOfHost = hostsNotInCluster.size(); for (int i = 0;; i = ++i % hostsNotInCluster.size()) { if (hostsNotInCluster.get(i).size() > 0) { - chosenBackendIds.add(hostsNotInCluster.get(i).remove(0).getId()); + chosenBackendIds.add(hostsNotInCluster.get(i).remove(0).getId()); } else { // avoid counting repeatedly if (hostIsEmpty[i] == false) { hostIsEmpty[i] = true; - numOfHost--; + numOfHost--; } } if (chosenBackendIds.size() == expansionNum || numOfHost == 0) { @@ -566,7 +565,7 @@ public List calculateExpansionBackends(String clusterName, int expansionNu } else { if (hostIsEmpty[i] == false) { hostIsEmpty[i] = true; - numOfHost--; + numOfHost--; } } if (chosenBackendIds.size() == expansionNum || numOfHost == 0) { @@ -577,190 +576,190 @@ public List calculateExpansionBackends(String clusterName, int expansionNu if (chosenBackendIds.size() != expansionNum) { LOG.info("not enough available backends. requires :" + expansionNum - + ", get:" + chosenBackendIds.size()); - return null; - } - - // set be state and owner - Iterator iterator = chosenBackendIds.iterator(); - while (iterator.hasNext()) { - final Long id = iterator.next(); - final Backend backend = idToBackends.get(id); - backend.setOwnerClusterName(clusterName); - backend.setBackendState(BackendState.using); - Catalog.getInstance().getEditLog().logBackendStateChange(backend); - } - return chosenBackendIds; - } - - /** - * get cluster's backend id list - * - * @param name - * @return - */ - public List getClusterBackends(String name) { - final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - final List ret = Lists.newArrayList(); - - if (Strings.isNullOrEmpty(name)) { - return ret; - } - - for (Backend backend : copiedBackends.values()) { - if (name.equals(backend.getOwnerClusterName())) { - ret.add(backend); - } - } - return ret; - } - - /** - * get cluster's backend id list - * - * @param name - * @return - */ - public List getClusterBackends(String name, boolean needAlive) { - final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - final List ret = new ArrayList(); - - if (Strings.isNullOrEmpty(name)) { - return null; - } - - if (needAlive) { - for (Backend backend : copiedBackends.values()) { - if (name.equals(backend.getOwnerClusterName())) { - if (backend != null && backend.isAlive()) { - ret.add(backend); - } - } - } - } else { - for (Backend backend : copiedBackends.values()) { - if (name.equals(backend.getOwnerClusterName())) { - ret.add(backend); - } - } - } - - return ret; - } - + + ", get:" + chosenBackendIds.size()); + return null; + } + + // set be state and owner + Iterator iterator = chosenBackendIds.iterator(); + while (iterator.hasNext()) { + final Long id = iterator.next(); + final Backend backend = idToBackends.get(id); + backend.setOwnerClusterName(clusterName); + backend.setBackendState(BackendState.using); + Catalog.getInstance().getEditLog().logBackendStateChange(backend); + } + return chosenBackendIds; + } + + /** + * get cluster's backend id list + * + * @param name + * @return + */ + public List getClusterBackends(String name) { + final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + final List ret = Lists.newArrayList(); + + if (Strings.isNullOrEmpty(name)) { + return ret; + } + + for (Backend backend : copiedBackends.values()) { + if (name.equals(backend.getOwnerClusterName())) { + ret.add(backend); + } + } + return ret; + } + + /** + * get cluster's backend id list + * + * @param name + * @return + */ + public List getClusterBackends(String name, boolean needAlive) { + final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + final List ret = new ArrayList(); + + if (Strings.isNullOrEmpty(name)) { + return null; + } + + if (needAlive) { + for (Backend backend : copiedBackends.values()) { + if (name.equals(backend.getOwnerClusterName())) { + if (backend != null && backend.isAlive()) { + ret.add(backend); + } + } + } + } else { + for (Backend backend : copiedBackends.values()) { + if (name.equals(backend.getOwnerClusterName())) { + ret.add(backend); + } + } + } + + return ret; + } + /** * get cluster's backend id list - * + * * @param clusterName * @return - */ + */ public List getClusterBackendIds(String clusterName) { if (Strings.isNullOrEmpty(clusterName)) { return null; } - ImmutableMap idToBackend = idToBackendRef.get(); - final List beIds = Lists.newArrayList(); - - for (Backend backend : idToBackend.values()) { - if (clusterName.equals(backend.getOwnerClusterName())) { - beIds.add(backend.getId()); - } - } - return beIds; - } - - /** - * get cluster's backend id list - * - * @param name - * @return - */ - public List getClusterBackendIds(String name, boolean needAlive) { - final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - final List ret = new ArrayList(); - - if (Strings.isNullOrEmpty(name)) { - return null; - } - - if (needAlive) { - for (Backend backend : copiedBackends.values()) { - if (name.equals(backend.getOwnerClusterName())) { - if (backend != null && backend.isAlive()) { - ret.add(backend.getId()); - } - } - } - } else { - for (Backend backend : copiedBackends.values()) { - if (name.equals(backend.getOwnerClusterName())) { - ret.add(backend.getId()); - } - } - } - - return ret; - } - - /** - * return backend list in every host - * - * @return - */ + ImmutableMap idToBackend = idToBackendRef.get(); + final List beIds = Lists.newArrayList(); + + for (Backend backend : idToBackend.values()) { + if (clusterName.equals(backend.getOwnerClusterName())) { + beIds.add(backend.getId()); + } + } + return beIds; + } + + /** + * get cluster's backend id list + * + * @param name + * @return + */ + public List getClusterBackendIds(String name, boolean needAlive) { + final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + final List ret = new ArrayList(); + + if (Strings.isNullOrEmpty(name)) { + return null; + } + + if (needAlive) { + for (Backend backend : copiedBackends.values()) { + if (name.equals(backend.getOwnerClusterName())) { + if (backend != null && backend.isAlive()) { + ret.add(backend.getId()); + } + } + } + } else { + for (Backend backend : copiedBackends.values()) { + if (name.equals(backend.getOwnerClusterName())) { + ret.add(backend.getId()); + } + } + } + + return ret; + } + + /** + * return backend list in every host + * + * @return + */ private Map> getHostBackendsMap(boolean needAlive, boolean needFree, - boolean canBeDecommission) { - final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - final Map> classMap = Maps.newHashMap(); - - // to select backend where state is free - for (Backend backend : copiedBackends.values()) { - if ((needAlive && !backend.isAlive()) || (needFree && !backend.isFreeFromCluster()) - || (!canBeDecommission && backend.isDecommissioned())) { - continue; - } - if (classMap.containsKey(backend.getHost())) { - final List list = classMap.get(backend.getHost()); - list.add(backend); - classMap.put(backend.getHost(), list); - } else { - final List list = new ArrayList(); - list.add(backend); - classMap.put(backend.getHost(), list); - } - } - return classMap; - } - - // choose backends by round robin - // return null if not enough backend - // use synchronized to run serially - public synchronized List seqChooseBackendIds(int backendNum, boolean needAlive, boolean isCreate, - String clusterName) { - long lastBackendId = -1L; - - if (clusterName.equals(DEFAULT_CLUSTER)) { - if (isCreate) { - lastBackendId = lastBackendIdForCreation; - } else { - lastBackendId = lastBackendIdForOther; - } - } else { - if (isCreate) { - if (lastBackendIdForCreationMap.containsKey(clusterName)) { - lastBackendId = lastBackendIdForCreationMap.get(clusterName); - } else { - lastBackendId = -1; - lastBackendIdForCreationMap.put(clusterName, lastBackendId); - } - } else { - if (lastBackendIdForOtherMap.containsKey(clusterName)) { - lastBackendId = lastBackendIdForOtherMap.get(clusterName); - } else { - lastBackendId = -1; - lastBackendIdForOtherMap.put(clusterName, lastBackendId); - } - } + boolean canBeDecommission) { + final Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + final Map> classMap = Maps.newHashMap(); + + // to select backend where state is free + for (Backend backend : copiedBackends.values()) { + if ((needAlive && !backend.isAlive()) || (needFree && !backend.isFreeFromCluster()) + || (!canBeDecommission && backend.isDecommissioned())) { + continue; + } + if (classMap.containsKey(backend.getHost())) { + final List list = classMap.get(backend.getHost()); + list.add(backend); + classMap.put(backend.getHost(), list); + } else { + final List list = new ArrayList(); + list.add(backend); + classMap.put(backend.getHost(), list); + } + } + return classMap; + } + + // choose backends by round robin + // return null if not enough backend + // use synchronized to run serially + public synchronized List seqChooseBackendIds(int backendNum, boolean needAlive, boolean isCreate, + String clusterName) { + long lastBackendId = -1L; + + if (clusterName.equals(DEFAULT_CLUSTER)) { + if (isCreate) { + lastBackendId = lastBackendIdForCreation; + } else { + lastBackendId = lastBackendIdForOther; + } + } else { + if (isCreate) { + if (lastBackendIdForCreationMap.containsKey(clusterName)) { + lastBackendId = lastBackendIdForCreationMap.get(clusterName); + } else { + lastBackendId = -1; + lastBackendIdForCreationMap.put(clusterName, lastBackendId); + } + } else { + if (lastBackendIdForOtherMap.containsKey(clusterName)) { + lastBackendId = lastBackendIdForOtherMap.get(clusterName); + } else { + lastBackendId = -1; + lastBackendIdForOtherMap.put(clusterName, lastBackendId); + } + } } // put backend with same host in same list @@ -768,7 +767,7 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA Map> backendMaps = Maps.newHashMap(); for (Backend backend : srcBackends) { if (backendMaps.containsKey(backend.getHost())){ - backendMaps.get(backend.getHost()).add(backend); + backendMaps.get(backend.getHost()).add(backend); } else { List list = Lists.newArrayList(); list.add(backend); @@ -782,86 +781,86 @@ public synchronized List seqChooseBackendIds(int backendNum, boolean needA Collections.shuffle(list); backends.add(list.get(0)); } - + Collections.shuffle(backends); - - List backendIds = Lists.newArrayList(); - // get last backend index - int lastBackendIndex = -1; - int index = -1; - for (Backend backend : backends) { - index++; - if (backend.getId() == lastBackendId) { - lastBackendIndex = index; - break; - } - } - Iterator iterator = Iterators.cycle(backends); - index = -1; - boolean failed = false; - // 2 cycle at most - int maxIndex = 2 * backends.size(); - while (iterator.hasNext() && backendIds.size() < backendNum) { - Backend backend = iterator.next(); - index++; - if (index <= lastBackendIndex) { - continue; - } - - if (index > maxIndex) { - failed = true; - break; - } - - if (needAlive) { - if (!backend.isAlive() || backend.isDecommissioned()) { - continue; - } - } - - long backendId = backend.getId(); - if (!backendIds.contains(backendId)) { - backendIds.add(backendId); - lastBackendId = backendId; - } else { - failed = true; - break; - } - } - - if (clusterName.equals(DEFAULT_CLUSTER)) { - if (isCreate) { - lastBackendIdForCreation = lastBackendId; - } else { - lastBackendIdForOther = lastBackendId; - } - } else { - // update last backendId - if (isCreate) { - lastBackendIdForCreationMap.put(clusterName, lastBackendId); - } else { - lastBackendIdForOtherMap.put(clusterName, lastBackendId); - } - } - if (backendIds.size() != backendNum) { - failed = true; - } - - if (!failed) { - return backendIds; - } - - // debug - for (Backend backend : backends) { - LOG.debug("random select: {}", backend.toString()); - } - - return null; - } - - public ImmutableMap getIdToBackend() { - return idToBackendRef.get(); - } + + List backendIds = Lists.newArrayList(); + // get last backend index + int lastBackendIndex = -1; + int index = -1; + for (Backend backend : backends) { + index++; + if (backend.getId() == lastBackendId) { + lastBackendIndex = index; + break; + } + } + Iterator iterator = Iterators.cycle(backends); + index = -1; + boolean failed = false; + // 2 cycle at most + int maxIndex = 2 * backends.size(); + while (iterator.hasNext() && backendIds.size() < backendNum) { + Backend backend = iterator.next(); + index++; + if (index <= lastBackendIndex) { + continue; + } + + if (index > maxIndex) { + failed = true; + break; + } + + if (needAlive) { + if (!backend.isAlive() || backend.isDecommissioned()) { + continue; + } + } + + long backendId = backend.getId(); + if (!backendIds.contains(backendId)) { + backendIds.add(backendId); + lastBackendId = backendId; + } else { + failed = true; + break; + } + } + + if (clusterName.equals(DEFAULT_CLUSTER)) { + if (isCreate) { + lastBackendIdForCreation = lastBackendId; + } else { + lastBackendIdForOther = lastBackendId; + } + } else { + // update last backendId + if (isCreate) { + lastBackendIdForCreationMap.put(clusterName, lastBackendId); + } else { + lastBackendIdForOtherMap.put(clusterName, lastBackendId); + } + } + if (backendIds.size() != backendNum) { + failed = true; + } + + if (!failed) { + return backendIds; + } + + // debug + for (Backend backend : backends) { + LOG.debug("random select: {}", backend.toString()); + } + + return null; + } + + public ImmutableMap getIdToBackend() { + return idToBackendRef.get(); + } public ImmutableMap getBackendsInCluster(String cluster) { @@ -873,143 +872,143 @@ public ImmutableMap getBackendsInCluster(String cluster) { for (Backend backend : idToBackendRef.get().values().asList()) { if (cluster.equals(backend.getOwnerClusterName())) { retMaps.put(backend.getId(), backend); - } + } } return ImmutableMap.copyOf(retMaps); - } - - public long getBackendReportVersion(long backendId) { - AtomicLong atomicLong = null; - if ((atomicLong = idToReportVersionRef.get().get(backendId)) == null) { - return -1L; - } else { - return atomicLong.get(); - } - } - - public void updateBackendReportVersion(long backendId, long newReportVersion, long dbId) { - AtomicLong atomicLong = null; - if ((atomicLong = idToReportVersionRef.get().get(backendId)) != null) { - Database db = Catalog.getInstance().getDb(dbId); - if (db != null) { - db.readLock(); - try { - atomicLong.set(newReportVersion); - } finally { - db.readUnlock(); - } - } - } - } - - public long saveBackends(DataOutputStream dos, long checksum) throws IOException { - ImmutableMap idToBackend = idToBackendRef.get(); - int backendCount = idToBackend.size(); - checksum ^= backendCount; - dos.writeInt(backendCount); - for (Map.Entry entry : idToBackend.entrySet()) { - long key = entry.getKey(); - checksum ^= key; - dos.writeLong(key); - entry.getValue().write(dos); - } - return checksum; - } - - public long loadBackends(DataInputStream dis, long checksum) throws IOException { - int count = dis.readInt(); - checksum ^= count; - for (int i = 0; i < count; i++) { - long key = dis.readLong(); - checksum ^= key; - Backend backend = Backend.read(dis); - replayAddBackend(backend); - } - return checksum; - } - - public void clear() { - this.idToBackendRef = null; - this.idToHeartbeatHandlerRef = null; - this.idToReportVersionRef = null; - } - - public void registerObserver(SystemInfoObserver observer) { - LOG.info("register observer {} {}: ", observer.getName(), observer.getClass()); - this.eventBus.register(observer); - } - - public void unregisterObserver(SystemInfoObserver observer) { - this.eventBus.unregister(observer); - } - - public static Pair validateHostAndPort(String hostPort) throws AnalysisException { - hostPort = hostPort.replaceAll("\\s+", ""); - if (hostPort.isEmpty()) { - throw new AnalysisException("Invalid host port: " + hostPort); - } - - String[] pair = hostPort.split(":"); - if (pair.length != 2) { - throw new AnalysisException("Invalid host port: " + hostPort); - } - - String host = pair[0]; - if (Strings.isNullOrEmpty(host)) { - throw new AnalysisException("Host is null"); - } - - int heartbeatPort = -1; - try { - // validate host - if (!InetAddressValidator.getInstance().isValid(host)) { - // maybe this is a hostname - // if no IP address for the host could be found, 'getByName' - // will throw - // UnknownHostException - InetAddress inetAddress = InetAddress.getByName(host); - host = inetAddress.getHostAddress(); - } - - // validate port - heartbeatPort = Integer.valueOf(pair[1]); - - if (heartbeatPort <= 0 || heartbeatPort >= 65536) { - throw new AnalysisException("Port is out of range: " + heartbeatPort); - } - - return new Pair(host, heartbeatPort); - } catch (UnknownHostException e) { - throw new AnalysisException("Unknown host: " + e.getMessage()); - } catch (Exception e) { - throw new AnalysisException("Encounter unknown exception: " + e.getMessage()); - } - } - - public void replayAddBackend(Backend newBackend) { - // update idToBackend - if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_30) { - newBackend.setOwnerClusterName(DEFAULT_CLUSTER); - newBackend.setBackendState(BackendState.using); - } - Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - copiedBackends.put(newBackend.getId(), newBackend); - ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); - idToBackendRef.set(newIdToBackend); - - // set new backend's report version as 0L - Map copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get()); - copiedReportVerions.put(newBackend.getId(), new AtomicLong(0L)); - ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions); - idToReportVersionRef.set(newIdToReportVersion); - - // update idToHeartbeatHandler - Map copiedHeartbeatHandlersMap = Maps.newHashMap(idToHeartbeatHandlerRef.get()); - TNetworkAddress tNetworkAddress = new TNetworkAddress(newBackend.getHost(), newBackend.getHeartbeatPort()); - HeartbeatHandler heartbeatHandler = new HeartbeatHandler(newBackend, tNetworkAddress); - copiedHeartbeatHandlersMap.put(newBackend.getId(), heartbeatHandler); - ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); - idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); + } + + public long getBackendReportVersion(long backendId) { + AtomicLong atomicLong = null; + if ((atomicLong = idToReportVersionRef.get().get(backendId)) == null) { + return -1L; + } else { + return atomicLong.get(); + } + } + + public void updateBackendReportVersion(long backendId, long newReportVersion, long dbId) { + AtomicLong atomicLong = null; + if ((atomicLong = idToReportVersionRef.get().get(backendId)) != null) { + Database db = Catalog.getInstance().getDb(dbId); + if (db != null) { + db.readLock(); + try { + atomicLong.set(newReportVersion); + } finally { + db.readUnlock(); + } + } + } + } + + public long saveBackends(DataOutputStream dos, long checksum) throws IOException { + ImmutableMap idToBackend = idToBackendRef.get(); + int backendCount = idToBackend.size(); + checksum ^= backendCount; + dos.writeInt(backendCount); + for (Map.Entry entry : idToBackend.entrySet()) { + long key = entry.getKey(); + checksum ^= key; + dos.writeLong(key); + entry.getValue().write(dos); + } + return checksum; + } + + public long loadBackends(DataInputStream dis, long checksum) throws IOException { + int count = dis.readInt(); + checksum ^= count; + for (int i = 0; i < count; i++) { + long key = dis.readLong(); + checksum ^= key; + Backend backend = Backend.read(dis); + replayAddBackend(backend); + } + return checksum; + } + + public void clear() { + this.idToBackendRef = null; + this.idToHeartbeatHandlerRef = null; + this.idToReportVersionRef = null; + } + + public void registerObserver(SystemInfoObserver observer) { + LOG.info("register observer {} {}: ", observer.getName(), observer.getClass()); + this.eventBus.register(observer); + } + + public void unregisterObserver(SystemInfoObserver observer) { + this.eventBus.unregister(observer); + } + + public static Pair validateHostAndPort(String hostPort) throws AnalysisException { + hostPort = hostPort.replaceAll("\\s+", ""); + if (hostPort.isEmpty()) { + throw new AnalysisException("Invalid host port: " + hostPort); + } + + String[] pair = hostPort.split(":"); + if (pair.length != 2) { + throw new AnalysisException("Invalid host port: " + hostPort); + } + + String host = pair[0]; + if (Strings.isNullOrEmpty(host)) { + throw new AnalysisException("Host is null"); + } + + int heartbeatPort = -1; + try { + // validate host + if (!InetAddressValidator.getInstance().isValid(host)) { + // maybe this is a hostname + // if no IP address for the host could be found, 'getByName' + // will throw + // UnknownHostException + InetAddress inetAddress = InetAddress.getByName(host); + host = inetAddress.getHostAddress(); + } + + // validate port + heartbeatPort = Integer.valueOf(pair[1]); + + if (heartbeatPort <= 0 || heartbeatPort >= 65536) { + throw new AnalysisException("Port is out of range: " + heartbeatPort); + } + + return new Pair(host, heartbeatPort); + } catch (UnknownHostException e) { + throw new AnalysisException("Unknown host: " + e.getMessage()); + } catch (Exception e) { + throw new AnalysisException("Encounter unknown exception: " + e.getMessage()); + } + } + + public void replayAddBackend(Backend newBackend) { + // update idToBackend + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_30) { + newBackend.setOwnerClusterName(DEFAULT_CLUSTER); + newBackend.setBackendState(BackendState.using); + } + Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + copiedBackends.put(newBackend.getId(), newBackend); + ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); + idToBackendRef.set(newIdToBackend); + + // set new backend's report version as 0L + Map copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get()); + copiedReportVerions.put(newBackend.getId(), new AtomicLong(0L)); + ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions); + idToReportVersionRef.set(newIdToReportVersion); + + // update idToHeartbeatHandler + Map copiedHeartbeatHandlersMap = Maps.newHashMap(idToHeartbeatHandlerRef.get()); + TNetworkAddress tNetworkAddress = new TNetworkAddress(newBackend.getHost(), newBackend.getHeartbeatPort()); + HeartbeatHandler heartbeatHandler = new HeartbeatHandler(newBackend, tNetworkAddress); + copiedHeartbeatHandlersMap.put(newBackend.getId(), heartbeatHandler); + ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); + idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); // to add be to DEFAULT_CLUSTER if (newBackend.getBackendState() == BackendState.using) { @@ -1018,31 +1017,31 @@ public void replayAddBackend(Backend newBackend) { // replay log cluster.addBackend(newBackend.getId()); } else { - // This happens in loading image when fe is restarted, because loadCluster is after loadBackend, + // This happens in loading image when fe is restarted, because loadCluster is after loadBackend, // cluster is not created. Be in cluster will be updated in loadCluster. } } } - - public void replayDropBackend(Backend backend) { - LOG.debug("replayDropBackend: {}", backend); - // update idToBackend - Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); - copiedBackends.remove(backend.getId()); - ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); - idToBackendRef.set(newIdToBackend); - - // update idToReportVersion - Map copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get()); - copiedReportVerions.remove(backend.getId()); - ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions); - idToReportVersionRef.set(newIdToReportVersion); - - // update idToHeartbeatHandler - Map copiedHeartbeatHandlersMap = Maps.newHashMap(idToHeartbeatHandlerRef.get()); - copiedHeartbeatHandlersMap.remove(backend.getId()); - ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); - idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); + + public void replayDropBackend(Backend backend) { + LOG.debug("replayDropBackend: {}", backend); + // update idToBackend + Map copiedBackends = Maps.newHashMap(idToBackendRef.get()); + copiedBackends.remove(backend.getId()); + ImmutableMap newIdToBackend = ImmutableMap.copyOf(copiedBackends); + idToBackendRef.set(newIdToBackend); + + // update idToReportVersion + Map copiedReportVerions = Maps.newHashMap(idToReportVersionRef.get()); + copiedReportVerions.remove(backend.getId()); + ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVerions); + idToReportVersionRef.set(newIdToReportVersion); + + // update idToHeartbeatHandler + Map copiedHeartbeatHandlersMap = Maps.newHashMap(idToHeartbeatHandlerRef.get()); + copiedHeartbeatHandlersMap.remove(backend.getId()); + ImmutableMap newIdToHeartbeatHandler = ImmutableMap.copyOf(copiedHeartbeatHandlersMap); + idToHeartbeatHandlerRef.set(newIdToHeartbeatHandler); // update cluster final Cluster cluster = Catalog.getInstance().getCluster(backend.getOwnerClusterName()); @@ -1051,104 +1050,104 @@ public void replayDropBackend(Backend backend) { } else { LOG.error("Cluster " + backend.getOwnerClusterName() + " no exist."); } - } - - public void updateBackendState(Backend be) { - long id = be.getId(); - Backend memoryBe = getBackend(id); - memoryBe.setBePort(be.getBePort()); - memoryBe.setAlive(be.isAlive()); - memoryBe.setDecommissioned(be.isDecommissioned()); - memoryBe.setHttpPort(be.getHttpPort()); - memoryBe.setBeRpcPort(be.getBeRpcPort()); - memoryBe.setLastUpdateMs(be.getLastUpdateMs()); - memoryBe.setLastStartTime(be.getLastStartTime()); - memoryBe.setDisks(be.getDisks()); - memoryBe.setBackendState(be.getBackendState()); - memoryBe.setOwnerClusterName(be.getOwnerClusterName()); - memoryBe.setDecommissionType(be.getDecommissionType()); - } - - public long getAvailableCapacityB() { - long capacity = 0L; - ImmutableMap idToBackend = idToBackendRef.get(); - for (Backend backend : idToBackend.values()) { - if (backend.isDecommissioned()) { - capacity -= backend.getTotalCapacityB() - backend.getAvailableCapacityB(); - } else { - capacity += backend.getAvailableCapacityB(); - } - } - return capacity; - } - - public void checkCapacity() throws DdlException { - if (getAvailableCapacityB() <= 0L) { - throw new DdlException("Cluster has no available capacity"); - } - } - - /** - * now we will only check capacity of logic cluster when execute operation - * - * @param clusterName - * @throws DdlException - */ - public void checkClusterCapacity(String clusterName) throws DdlException { - if (getClusterBackends(clusterName).isEmpty()) { - throw new DdlException("Cluster has no available capacity"); - } - } - - @Override - protected void runOneCycle() { - ImmutableMap idToHeartbeatHandler = idToHeartbeatHandlerRef.get(); - Iterator iterator = idToHeartbeatHandler.values().iterator(); - while (iterator.hasNext()) { - HeartbeatHandler heartbeatHandler = iterator.next(); - executor.submit(heartbeatHandler); - } - } - - private class HeartbeatHandler implements Runnable { - private Backend backend; - private TNetworkAddress address; - - public HeartbeatHandler(Backend backend, TNetworkAddress networkAddress) { - this.backend = backend; - this.address = networkAddress; - } - - @Override - public void run() { - long backendId = backend.getId(); - HeartbeatService.Client client = null; - boolean ok = false; - try { - client = ClientPool.heartbeatPool.borrowObject(address); - THeartbeatResult result = client.heartbeat(masterInfo.get()); - if (result.getStatus().getStatus_code() == TStatusCode.OK) { - TBackendInfo tBackendInfo = result.getBackend_info(); - int bePort = tBackendInfo.getBe_port(); - int httpPort = tBackendInfo.getHttp_port(); - int beRpcPort = tBackendInfo.getBe_rpc_port(); - backend.updateOnce(bePort, httpPort, beRpcPort); - } else { - LOG.warn("failed to heartbeat backend[" + backendId + "]: " + result.getStatus().toString()); - backend.setBad(eventBus); - } - ok = true; - LOG.debug("backend[{}] host: {}, port: {}", backendId, backend.getHost(), backend.getHeartbeatPort()); - } catch (Exception e) { - LOG.warn("backend[" + backendId + "] got Exception: ", e); - backend.setBad(eventBus); - } finally { - if (ok) { - ClientPool.heartbeatPool.returnObject(address, client); - } else { - ClientPool.heartbeatPool.invalidateObject(address, client); - } - } - } - } -} + } + + public void updateBackendState(Backend be) { + long id = be.getId(); + Backend memoryBe = getBackend(id); + memoryBe.setBePort(be.getBePort()); + memoryBe.setAlive(be.isAlive()); + memoryBe.setDecommissioned(be.isDecommissioned()); + memoryBe.setHttpPort(be.getHttpPort()); + memoryBe.setBeRpcPort(be.getBeRpcPort()); + memoryBe.setLastUpdateMs(be.getLastUpdateMs()); + memoryBe.setLastStartTime(be.getLastStartTime()); + memoryBe.setDisks(be.getDisks()); + memoryBe.setBackendState(be.getBackendState()); + memoryBe.setOwnerClusterName(be.getOwnerClusterName()); + memoryBe.setDecommissionType(be.getDecommissionType()); + } + + public long getAvailableCapacityB() { + long capacity = 0L; + ImmutableMap idToBackend = idToBackendRef.get(); + for (Backend backend : idToBackend.values()) { + if (backend.isDecommissioned()) { + capacity -= backend.getTotalCapacityB() - backend.getAvailableCapacityB(); + } else { + capacity += backend.getAvailableCapacityB(); + } + } + return capacity; + } + + public void checkCapacity() throws DdlException { + if (getAvailableCapacityB() <= 0L) { + throw new DdlException("Cluster has no available capacity"); + } + } + + /** + * now we will only check capacity of logic cluster when execute operation + * + * @param clusterName + * @throws DdlException + */ + public void checkClusterCapacity(String clusterName) throws DdlException { + if (getClusterBackends(clusterName).isEmpty()) { + throw new DdlException("Cluster has no available capacity"); + } + } + + @Override + protected void runOneCycle() { + ImmutableMap idToHeartbeatHandler = idToHeartbeatHandlerRef.get(); + Iterator iterator = idToHeartbeatHandler.values().iterator(); + while (iterator.hasNext()) { + HeartbeatHandler heartbeatHandler = iterator.next(); + executor.submit(heartbeatHandler); + } + } + + private class HeartbeatHandler implements Runnable { + private Backend backend; + private TNetworkAddress address; + + public HeartbeatHandler(Backend backend, TNetworkAddress networkAddress) { + this.backend = backend; + this.address = networkAddress; + } + + @Override + public void run() { + long backendId = backend.getId(); + HeartbeatService.Client client = null; + boolean ok = false; + try { + client = ClientPool.heartbeatPool.borrowObject(address); + THeartbeatResult result = client.heartbeat(masterInfo.get()); + if (result.getStatus().getStatus_code() == TStatusCode.OK) { + TBackendInfo tBackendInfo = result.getBackend_info(); + int bePort = tBackendInfo.getBe_port(); + int httpPort = tBackendInfo.getHttp_port(); + int beRpcPort = tBackendInfo.getBe_rpc_port(); + backend.updateOnce(bePort, httpPort, beRpcPort); + } else { + LOG.warn("failed to heartbeat backend[" + backendId + "]: " + result.getStatus().toString()); + backend.setBad(eventBus); + } + ok = true; + LOG.debug("backend[{}] host: {}, port: {}", backendId, backend.getHost(), backend.getHeartbeatPort()); + } catch (Exception e) { + LOG.warn("backend[" + backendId + "] got Exception: ", e); + backend.setBad(eventBus); + } finally { + if (ok) { + ClientPool.heartbeatPool.returnObject(address, client); + } else { + ClientPool.heartbeatPool.invalidateObject(address, client); + } + } + } + } +} diff --git a/fe/test/com/baidu/palo/persist/StorageTest.java b/fe/test/com/baidu/palo/persist/StorageTest.java index 6126de6bcd7489..eb1d26627fce1d 100644 --- a/fe/test/com/baidu/palo/persist/StorageTest.java +++ b/fe/test/com/baidu/palo/persist/StorageTest.java @@ -17,139 +17,139 @@ // specific language governing permissions and limitations // under the License. -package com.baidu.palo.persist; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; - -public class StorageTest { - private String meta = "storageTestDir/"; - - public void mkdir() { - File dir = new File(meta); - if (!dir.exists()) { - dir.mkdir(); - } else { - File[] files = dir.listFiles(); - for (File file : files) { - if (file.isFile()) { - file.delete(); - } - } - } - } - - public void addFiles(int image, int edit) { - File imageFile = new File(meta + "image." + image); - try { - imageFile.createNewFile(); - } catch (IOException e) { - e.printStackTrace(); - } - - for (int i = 1; i <= edit; i++) { - File editFile = new File(meta + "edits." + i); - try { - editFile.createNewFile(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - File current = new File(meta + "edits"); - try { - current.createNewFile(); - } catch (IOException e) { - e.printStackTrace(); - } - - File version = new File(meta + "VERSION"); - try { - version.createNewFile(); - String line1 = "#Mon Feb 02 13:59:54 CST 2015\n"; - String line2 = "clusterId=966271669"; - FileWriter fw = new FileWriter(version); - fw.write(line1); - fw.write(line2); - fw.flush(); - fw.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void deleteDir() { - File dir = new File(meta); - if (dir.exists()) { - File[] files = dir.listFiles(); - for (File file : files) { - if (file.isFile()) { - file.delete(); - } - } - - dir.delete(); - } - } - - @Test - public void testConstruct() { - Storage storage1 = new Storage(1, "test"); - Assert.assertEquals(1, storage1.getClusterID()); - Assert.assertEquals("test", storage1.getMetaDir()); - - Storage storage2 = new Storage(1, 2, 3, "test"); - Assert.assertEquals(1, storage2.getClusterID()); - Assert.assertEquals(2, storage2.getImageSeq()); - Assert.assertEquals(3, storage2.getEditsSeq()); - Assert.assertEquals("test", storage2.getMetaDir()); - } - - @Test - public void testStorage() throws Exception { - mkdir(); - addFiles(0, 10); - - Storage storage = new Storage("storageTestDir"); - Assert.assertEquals(966271669, storage.getClusterID()); - storage.setClusterID(1234); - Assert.assertEquals(1234, storage.getClusterID()); - Assert.assertEquals(0, storage.getImageSeq()); - Assert.assertEquals(10, Storage.getMetaSeq(new File("storageTestDir/edits.10"))); - Assert.assertTrue(Storage.getCurrentEditsFile(new File("storageTestDir")) - .equals(new File("storageTestDir/edits"))); - - Assert.assertTrue(storage.getCurrentImageFile().equals(new File("storageTestDir/image.0"))); - Assert.assertTrue(storage.getImageFile(0).equals(new File("storageTestDir/image.0"))); - Assert.assertTrue(Storage.getImageFile(new File("storageTestDir"), 0) - .equals(new File("storageTestDir/image.0"))); - - Assert.assertTrue(storage.getCurrentEditsFile().equals(new File("storageTestDir/edits"))); - Assert.assertTrue(storage.getEditsFile(5).equals(new File("storageTestDir/edits.5"))); - Assert.assertTrue(Storage.getEditsFile(new File("storageTestDir"), 3) - .equals(new File("storageTestDir/edits.3"))); - - Assert.assertTrue(storage.getVersionFile().equals(new File("storageTestDir/VERSION"))); - - storage.setImageSeq(100); - Assert.assertEquals(100, storage.getImageSeq()); - - storage.setEditsSeq(100); - Assert.assertEquals(100, storage.getEditsSeq()); - - Assert.assertEquals("storageTestDir", storage.getMetaDir()); - storage.setMetaDir("abcd"); - Assert.assertEquals("abcd", storage.getMetaDir()); - - storage.setMetaDir("storageTestDir"); - storage.clear(); - File file = new File(storage.getMetaDir()); - Assert.assertEquals(0, file.list().length); - - deleteDir(); - } -} +package com.baidu.palo.persist; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; + +public class StorageTest { + private String meta = "storageTestDir/"; + + public void mkdir() { + File dir = new File(meta); + if (!dir.exists()) { + dir.mkdir(); + } else { + File[] files = dir.listFiles(); + for (File file : files) { + if (file.isFile()) { + file.delete(); + } + } + } + } + + public void addFiles(int image, int edit) { + File imageFile = new File(meta + "image." + image); + try { + imageFile.createNewFile(); + } catch (IOException e) { + e.printStackTrace(); + } + + for (int i = 1; i <= edit; i++) { + File editFile = new File(meta + "edits." + i); + try { + editFile.createNewFile(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + File current = new File(meta + "edits"); + try { + current.createNewFile(); + } catch (IOException e) { + e.printStackTrace(); + } + + File version = new File(meta + "VERSION"); + try { + version.createNewFile(); + String line1 = "#Mon Feb 02 13:59:54 CST 2015\n"; + String line2 = "clusterId=966271669"; + FileWriter fw = new FileWriter(version); + fw.write(line1); + fw.write(line2); + fw.flush(); + fw.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void deleteDir() { + File dir = new File(meta); + if (dir.exists()) { + File[] files = dir.listFiles(); + for (File file : files) { + if (file.isFile()) { + file.delete(); + } + } + + dir.delete(); + } + } + + @Test + public void testConstruct() { + Storage storage1 = new Storage(1, "token", "test"); + Assert.assertEquals(1, storage1.getClusterID()); + Assert.assertEquals("test", storage1.getMetaDir()); + + Storage storage2 = new Storage(1, "token", 2, 3, "test"); + Assert.assertEquals(1, storage2.getClusterID()); + Assert.assertEquals(2, storage2.getImageSeq()); + Assert.assertEquals(3, storage2.getEditsSeq()); + Assert.assertEquals("test", storage2.getMetaDir()); + } + + @Test + public void testStorage() throws Exception { + mkdir(); + addFiles(0, 10); + + Storage storage = new Storage("storageTestDir"); + Assert.assertEquals(966271669, storage.getClusterID()); + storage.setClusterID(1234); + Assert.assertEquals(1234, storage.getClusterID()); + Assert.assertEquals(0, storage.getImageSeq()); + Assert.assertEquals(10, Storage.getMetaSeq(new File("storageTestDir/edits.10"))); + Assert.assertTrue(Storage.getCurrentEditsFile(new File("storageTestDir")) + .equals(new File("storageTestDir/edits"))); + + Assert.assertTrue(storage.getCurrentImageFile().equals(new File("storageTestDir/image.0"))); + Assert.assertTrue(storage.getImageFile(0).equals(new File("storageTestDir/image.0"))); + Assert.assertTrue(Storage.getImageFile(new File("storageTestDir"), 0) + .equals(new File("storageTestDir/image.0"))); + + Assert.assertTrue(storage.getCurrentEditsFile().equals(new File("storageTestDir/edits"))); + Assert.assertTrue(storage.getEditsFile(5).equals(new File("storageTestDir/edits.5"))); + Assert.assertTrue(Storage.getEditsFile(new File("storageTestDir"), 3) + .equals(new File("storageTestDir/edits.3"))); + + Assert.assertTrue(storage.getVersionFile().equals(new File("storageTestDir/VERSION"))); + + storage.setImageSeq(100); + Assert.assertEquals(100, storage.getImageSeq()); + + storage.setEditsSeq(100); + Assert.assertEquals(100, storage.getEditsSeq()); + + Assert.assertEquals("storageTestDir", storage.getMetaDir()); + storage.setMetaDir("abcd"); + Assert.assertEquals("abcd", storage.getMetaDir()); + + storage.setMetaDir("storageTestDir"); + storage.clear(); + File file = new File(storage.getMetaDir()); + Assert.assertEquals(0, file.list().length); + + deleteDir(); + } +} diff --git a/gensrc/thrift/HeartbeatService.thrift b/gensrc/thrift/HeartbeatService.thrift index 91f2133e764890..683154d1fbe9df 100644 --- a/gensrc/thrift/HeartbeatService.thrift +++ b/gensrc/thrift/HeartbeatService.thrift @@ -28,6 +28,7 @@ struct TMasterInfo { 1: required Types.TNetworkAddress network_address 2: required Types.TClusterId cluster_id 3: required Types.TEpoch epoch + 4: optional string token } struct TBackendInfo {