From c7e2da8b0699e3b292a7561a173f35e0fa399625 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 6 Oct 2020 20:37:11 +0800 Subject: [PATCH 1/3] first --- be/src/runtime/data_spliter.cpp | 16 +------ be/src/runtime/data_stream_sender.cpp | 45 +++++++++++++------ be/src/runtime/dpp_sink_internal.cpp | 6 +-- be/src/runtime/dpp_sink_internal.h | 4 +- .../java/org/apache/doris/qe/Coordinator.java | 2 +- .../org/apache/doris/rpc/RpcException.java | 16 ++++++- 6 files changed, 54 insertions(+), 35 deletions(-) diff --git a/be/src/runtime/data_spliter.cpp b/be/src/runtime/data_spliter.cpp index 778e49e05c4f8a..f279f2ca5d6acb 100644 --- a/be/src/runtime/data_spliter.cpp +++ b/be/src/runtime/data_spliter.cpp @@ -308,22 +308,10 @@ Status DataSpliter::close(RuntimeState* state, Status close_status) { } Expr::close(_partition_expr_ctxs, state); for (auto& iter : _rollup_map) { - Status status = iter.second->close(state); - if (UNLIKELY(is_ok && !status.ok())) { - LOG(WARNING) << "close rollup_map error" - << " err_msg=" << status.get_error_msg(); - is_ok = false; - err_status = status; - } + iter.second->close(state); } for (auto iter : _partition_infos) { - Status status = iter->close(state); - if (UNLIKELY(is_ok && !status.ok())) { - LOG(WARNING) << "close partition_info error" - << " err_msg=" << status.get_error_msg(); - is_ok = false; - err_status = status; - } + iter->close(state); } _expr_mem_tracker.reset(); diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index 543c1f22ed63e1..853e0a945a7391 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -33,6 +33,7 @@ #include "runtime/client_cache.h" #include "runtime/dpp_sink_internal.h" #include "runtime/mem_tracker.h" +#include "service/backend_options.h" #include "util/debug_util.h" #include "util/network_util.h" #include "util/thrift_client.h" @@ -114,10 +115,10 @@ class DataStreamSender::Channel { // of close operation, client should call close_wait() to finish channel's close. // We split one close operation into two phases in order to make multiple channels // can run parallel. - void close(RuntimeState* state); + Status close(RuntimeState* state); // Get close wait's response, to finish channel close operation. - void close_wait(RuntimeState* state); + Status close_wait(RuntimeState* state); int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; @@ -137,9 +138,11 @@ class DataStreamSender::Channel { auto cntl = &_closure->cntl; brpc::Join(cntl->call_id()); if (cntl->Failed()) { - LOG(WARNING) << "failed to send brpc batch, error=" << berror(cntl->ErrorCode()) - << ", error_text=" << cntl->ErrorText(); - return Status::ThriftRpcError("failed to send batch"); + std::stringstream ss; + ss << "failed to send brpc batch, error=" << berror(cntl->ErrorCode()) + << ", error_text=" << cntl->ErrorText() << ", client: " << BackendOptions::get_localhost(); + LOG(WARNING) << ss.str(); + return Status::ThriftRpcError(ss.str()); } return Status::OK(); } @@ -296,16 +299,25 @@ Status DataStreamSender::Channel::close_internal() { return Status::OK(); } -void DataStreamSender::Channel::close(RuntimeState* state) { - state->log_error(close_internal().get_error_msg()); +Status DataStreamSender::Channel::close(RuntimeState* state) { + Status st = close_internal(); + if (!st.ok()) { + state->log_error(st.get_error_msg()); + } + return st; } -void DataStreamSender::Channel::close_wait(RuntimeState* state) { +Status DataStreamSender::Channel::close_wait(RuntimeState* state) { if (_need_close) { - state->log_error(_wait_last_brpc().get_error_msg()); + Status st = _wait_last_brpc(); + if (!st.ok()) { + state->log_error(st.get_error_msg()); + } _need_close = false; + return st; } _batch.reset(); + return Status::OK(); } DataStreamSender::DataStreamSender( @@ -606,19 +618,26 @@ Status DataStreamSender::compute_range_part_code( Status DataStreamSender::close(RuntimeState* state, Status exec_status) { // TODO: only close channels that didn't have any errors // make all channels close parallel + Status final_st = Status::OK(); for (int i = 0; i < _channels.size(); ++i) { - _channels[i]->close(state); + Status st = _channels[i]->close(state); + if (!st.ok() && final_st.ok()) { + final_st = st; + } } // wait all channels to finish for (int i = 0; i < _channels.size(); ++i) { - _channels[i]->close_wait(state); + Status st = _channels[i]->close_wait(state); + if (!st.ok() && final_st.ok()) { + final_st = st; + } } for (auto iter : _partition_infos) { - RETURN_IF_ERROR(iter->close(state)); + iter->close(state); } Expr::close(_partition_expr_ctxs, state); - return Status::OK(); + return final_st; } template diff --git a/be/src/runtime/dpp_sink_internal.cpp b/be/src/runtime/dpp_sink_internal.cpp index d2d5b19ed38e7f..84264f55c53792 100644 --- a/be/src/runtime/dpp_sink_internal.cpp +++ b/be/src/runtime/dpp_sink_internal.cpp @@ -82,10 +82,9 @@ Status RollupSchema::open(RuntimeState* state) { return Status::OK(); } -Status RollupSchema::close(RuntimeState* state) { +void RollupSchema::close(RuntimeState* state) { Expr::close(_key_ctxs, state); Expr::close(_value_ctxs, state); - return Status::OK(); } Status PartRangeKey::from_thrift( @@ -237,11 +236,10 @@ Status PartitionInfo::open(RuntimeState* state) { return Status::OK(); } -Status PartitionInfo::close(RuntimeState* state) { +void PartitionInfo::close(RuntimeState* state) { if (_distributed_expr_ctxs.size() > 0) { Expr::close(_distributed_expr_ctxs, state); } - return Status::OK(); } } diff --git a/be/src/runtime/dpp_sink_internal.h b/be/src/runtime/dpp_sink_internal.h index af093b8b2500b0..4e67ad0f5f9ed0 100644 --- a/be/src/runtime/dpp_sink_internal.h +++ b/be/src/runtime/dpp_sink_internal.h @@ -53,7 +53,7 @@ class RollupSchema { Status open(RuntimeState* state); - Status close(RuntimeState* state); + void close(RuntimeState* state); const std::string& keys_type() const { return _keys_type; @@ -263,7 +263,7 @@ class PartitionInfo { Status open(RuntimeState* state); - Status close(RuntimeState* state); + void close(RuntimeState* state); int64_t id() const { return _id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 89f60665ba5ab8..e450f23220d3d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -668,7 +668,7 @@ public RowBatch getNext() throws Exception { copyStatus.rewriteErrorMsg(); } if (copyStatus.isRpcError()) { - throw new RpcException("unknown", copyStatus.getErrorMsg()); + throw new RpcException(null, copyStatus.getErrorMsg()); } else { String errMsg = copyStatus.getErrorMsg(); LOG.warn("query failed: {}", errMsg); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java index 8648b1c163e972..d65802633b896a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/RpcException.java @@ -17,9 +17,23 @@ package org.apache.doris.rpc; +import com.google.common.base.Strings; + public class RpcException extends Exception { + private String host; + public RpcException(String host, String message) { - super(message + ", host: " + host); + super(message); + this.host = host; + } + + @Override + public String getMessage() { + if (Strings.isNullOrEmpty(host)) { + return super.getMessage(); + } + return super.getMessage() + ", host: " + host; } } + From be4504ffbbe9b8b27609c83d1576190745164cc0 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 6 Oct 2020 21:44:11 +0800 Subject: [PATCH 2/3] second --- .../java/org/apache/doris/qe/Coordinator.java | 18 +- .../org/apache/doris/qe/ResultReceiver.java | 4 +- .../org/apache/doris/qe/SimpleScheduler.java | 169 ++++++++++-------- .../apache/doris/qe/cache/CacheBeProxy.java | 29 +-- .../apache/doris/qe/SimpleSchedulerTest.java | 71 +++++--- 5 files changed, 170 insertions(+), 121 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index e450f23220d3d1..dd979bbae51183 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -493,6 +493,7 @@ public void exec() throws Exception { for (Pair> pair : futures) { TStatusCode code = TStatusCode.INTERNAL_ERROR; String errMsg = null; + Exception exception = null; try { PExecPlanFragmentResult result = pair.second.get(Config.remote_fragment_exec_timeout_ms, TimeUnit.MILLISECONDS); @@ -502,16 +503,23 @@ public void exec() throws Exception { } } catch (ExecutionException e) { LOG.warn("catch a execute exception", e); + exception = e; code = TStatusCode.THRIFT_RPC_ERROR; } catch (InterruptedException e) { LOG.warn("catch a interrupt exception", e); + exception = e; code = TStatusCode.INTERNAL_ERROR; } catch (TimeoutException e) { LOG.warn("catch a timeout exception", e); + exception = e; code = TStatusCode.TIMEOUT; } if (code != TStatusCode.OK) { + if (exception != null) { + errMsg = exception.getMessage(); + } + if (errMsg == null) { errMsg = "exec rpc error. backend id: " + pair.first.backend.getId(); } @@ -524,7 +532,7 @@ public void exec() throws Exception { case TIMEOUT: throw new UserException("query timeout. backend id: " + pair.first.backend.getId()); case THRIFT_RPC_ERROR: - SimpleScheduler.addToBlacklist(pair.first.backend.getId()); + SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg); throw new RpcException(pair.first.backend.getHost(), "rpc failed"); default: throw new UserException(errMsg); @@ -1205,9 +1213,6 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc int randomLocation = new Random().nextInt(seqLocation.locations.size()); Reference backendIdRef = new Reference(); TNetworkAddress execHostPort = SimpleScheduler.getHost(seqLocation.locations.get(randomLocation).backend_id, seqLocation.locations, this.idToBackend, backendIdRef); - if (execHostPort == null) { - throw new UserException("there is no scanNode Backend"); - } this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } @@ -1236,9 +1241,6 @@ private void computeScanRangeAssignmentByScheduler( Reference backendIdRef = new Reference(); TNetworkAddress execHostPort = SimpleScheduler.getHost(minLocation.backend_id, scanRangeLocations.getLocations(), this.idToBackend, backendIdRef); - if (execHostPort == null) { - throw new UserException("there is no scanNode Backend"); - } this.addressToBackendID.put(execHostPort, backendIdRef.getRef()); Map> scanRanges = findOrInsert(assignment, execHostPort, @@ -1482,7 +1484,7 @@ public synchronized boolean cancelFragmentInstance(PPlanFragmentCancelReason can } catch (RpcException e) { LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(), brpcAddress.getPort()); - SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress)); + SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), e.getMessage()); } this.hasCanceled = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java index 857b76cb3f1939..916654307039d2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java @@ -116,7 +116,7 @@ public RowBatch getNext(Status status) throws TException { } catch (RpcException e) { LOG.warn("fetch result rpc exception, finstId={}", finstId, e); status.setRpcStatus(e.getMessage()); - SimpleScheduler.addToBlacklist(backendId); + SimpleScheduler.addToBlacklist(backendId, e.getMessage()); } catch (ExecutionException e) { LOG.warn("fetch result execution exception, finstId={}", finstId, e); if (e.getMessage().contains("time out")) { @@ -124,7 +124,7 @@ public RowBatch getNext(Status status) throws TException { status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage())); } else { status.setRpcStatus(e.getMessage()); - SimpleScheduler.addToBlacklist(backendId); + SimpleScheduler.addToBlacklist(backendId, e.getMessage()); } } catch (TimeoutException e) { LOG.warn("fetch result timeout, finstId={}", finstId, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index f8388b8421016a..b790a20970dd80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -19,7 +19,9 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; +import org.apache.doris.common.UserException; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TNetworkAddress; @@ -29,6 +31,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,79 +39,99 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; public class SimpleScheduler { private static AtomicLong nextId = new AtomicLong(0); private static final Logger LOG = LogManager.getLogger(SimpleScheduler.class); - private static Map blacklistBackends = Maps.newHashMap(); - private static Lock lock = new ReentrantLock(); + // backend id -> (try time, reason) + // There will be multi threads to read and modify this map. + // But only one thread (UpdateBlacklistThread) will modify the `Pair`. + // So using concurrenty map is enough + private static Map> blacklistBackends = Maps.newConcurrentMap(); private static UpdateBlacklistThread updateBlacklistThread; static { updateBlacklistThread = new UpdateBlacklistThread(); updateBlacklistThread.start(); } - - public static TNetworkAddress getHost(long backendId, + + public static TNetworkAddress getHost(long backendId, List locations, ImmutableMap backends, - Reference backendIdRef) { - if (locations == null || backends == null) { - return null; + Reference backendIdRef) + throws UserException { + if (CollectionUtils.isEmpty(locations) || backends == null || backends.isEmpty()) { + throw new UserException("scan range location or candidate backends is empty"); } LOG.debug("getHost backendID={}, backendSize={}", backendId, backends.size()); Backend backend = backends.get(backendId); - lock.lock(); - try { - if (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backendId)) { - backendIdRef.setRef(backendId); - return new TNetworkAddress(backend.getHost(), backend.getBePort()); - } else { - for (TScanRangeLocation location : locations) { - if (location.backend_id == backendId) { - continue; - } - // choose the first alive backend(in analysis stage, the locations are random) - Backend candidateBackend = backends.get(location.backend_id); - if (candidateBackend != null && candidateBackend.isAlive() - && !blacklistBackends.containsKey(location.backend_id)) { - backendIdRef.setRef(location.backend_id); - return new TNetworkAddress(candidateBackend.getHost(), candidateBackend.getBePort()); - } + + if (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backendId)) { + backendIdRef.setRef(backendId); + return new TNetworkAddress(backend.getHost(), backend.getBePort()); + } else { + for (TScanRangeLocation location : locations) { + if (location.backend_id == backendId) { + continue; + } + // choose the first alive backend(in analysis stage, the locations are random) + Backend candidateBackend = backends.get(location.backend_id); + if (candidateBackend != null && candidateBackend.isAlive() + && !blacklistBackends.containsKey(location.backend_id)) { + backendIdRef.setRef(location.backend_id); + return new TNetworkAddress(candidateBackend.getHost(), candidateBackend.getBePort()); } - } - } finally { - lock.unlock(); + } } + // no backend returned - return null; + throw new UserException("there is no scanNode Backend. " + + getBackendErrorMsg(locations.stream().map(l -> l.backend_id).collect(Collectors.toList()), + backends, locations.size())); } - + + // get the reason why backends can not be chosen. + private static String getBackendErrorMsg(List backendIds, ImmutableMap backends, int limit) { + List res = Lists.newArrayList(); + for (int i = 0; i < backendIds.size() && i < limit; i++) { + long beId = backendIds.get(i); + Backend be = backends.get(beId); + if (be == null) { + res.add(beId + ": not exist"); + } else if (!be.isAlive()) { + res.add(beId + ": not alive"); + } else if (blacklistBackends.containsKey(beId)) { + Pair pair = blacklistBackends.get(beId); + res.add(beId + ": in black list(" + (pair == null ? "unknown" : pair.second) + ")"); + } else { + res.add(beId + ": unknown"); + } + } + return res.toString(); + } + public static TNetworkAddress getHost(ImmutableMap backends, - Reference backendIdRef) { - if (backends == null) { - return null; + Reference backendIdRef) + throws UserException { + if (backends == null || backends.isEmpty()) { + throw new UserException("candidate backends is empty"); } int backendSize = backends.size(); - if (backendSize == 0) { - return null; - } long id = nextId.getAndIncrement() % backendSize; List idToBackendId = Lists.newArrayList(); idToBackendId.addAll(backends.keySet()); Long backendId = idToBackendId.get((int) id); Backend backend = backends.get(backendId); - + if (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backendId)) { backendIdRef.setRef(backendId); return new TNetworkAddress(backend.getHost(), backend.getBePort()); } else { long candidateId = id + 1; // get next candidate id - for (int i = 0; i < backendSize; i ++, candidateId ++) { + for (int i = 0; i < backendSize; i++, candidateId++) { LOG.debug("i={} candidatedId={}", i, candidateId); if (candidateId >= backendSize) { candidateId = 0; @@ -127,21 +150,17 @@ public static TNetworkAddress getHost(ImmutableMap backends, } } // no backend returned - return null; + throw new UserException("there is no scanNode Backend. " + + getBackendErrorMsg(Lists.newArrayList(backends.keySet()), backends, 3)); } - - public static void addToBlacklist(Long backendID) { + + public static void addToBlacklist(Long backendID, String reason) { if (backendID == null) { return; } - lock.lock(); - try { - int tryTime = FeConstants.heartbeat_interval_second + 1; - blacklistBackends.put(backendID, tryTime); - LOG.warn("add black list " + backendID); - } finally { - lock.unlock(); - } + + blacklistBackends.put(backendID, Pair.create(FeConstants.heartbeat_interval_second + 1, reason)); + LOG.warn("add backend {} to black list. reason: {}", backendID, reason); } public static boolean isAlive(Backend backend) { @@ -169,38 +188,34 @@ public void run() { Thread.sleep(1000L); SystemInfoService clusterInfoService = Catalog.getCurrentSystemInfo(); LOG.debug("UpdateBlacklistThread retry begin"); - lock.lock(); - try { - Iterator> iterator = blacklistBackends.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - Long backendId = entry.getKey(); - - // remove from blacklist if - // 1. backend does not exist anymore - // 2. backend is alive - if (clusterInfoService.getBackend(backendId) == null - || clusterInfoService.checkBackendAvailable(backendId)) { + + Iterator>> iterator = blacklistBackends.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + Long backendId = entry.getKey(); + + // remove from blacklist if + // 1. backend does not exist anymore + // 2. backend is alive + if (clusterInfoService.getBackend(backendId) == null + || clusterInfoService.checkBackendAvailable(backendId)) { + iterator.remove(); + LOG.debug("remove backendID {} which is alive", backendId); + } else { + // 3. max try time is reach + Integer retryTimes = entry.getValue().first; + retryTimes = retryTimes - 1; + if (retryTimes <= 0) { iterator.remove(); - LOG.debug("remove backendID {} which is alive", backendId); + LOG.warn("remove backendID {}. reach max try time", backendId); } else { - // 3. max try time is reach - Integer retryTimes = entry.getValue(); - retryTimes = retryTimes - 1; - if (retryTimes <= 0) { - iterator.remove(); - LOG.warn("remove backendID {}. reach max try time", backendId); - } else { - entry.setValue(retryTimes); - LOG.debug("blacklistBackends backendID={} retryTimes={}", backendId, retryTimes); - } + LOG.debug("blacklistBackends backendID={} retryTimes={}", backendId, retryTimes); } } - } finally { - lock.unlock(); - LOG.debug("UpdateBlacklistThread retry end"); } - + + LOG.debug("UpdateBlacklistThread retry end"); + } catch (Throwable ex) { LOG.warn("blacklist thread exception" + ex); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java index cd989709484f63..064c86b3b5679c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheBeProxy.java @@ -17,30 +17,30 @@ package org.apache.doris.qe.cache; -import org.apache.doris.proto.PCacheStatus; +import org.apache.doris.common.Status; import org.apache.doris.proto.PCacheResponse; -import org.apache.doris.proto.PUpdateCacheRequest; +import org.apache.doris.proto.PCacheStatus; +import org.apache.doris.proto.PClearCacheRequest; +import org.apache.doris.proto.PClearType; import org.apache.doris.proto.PFetchCacheRequest; import org.apache.doris.proto.PFetchCacheResult; -import org.apache.doris.proto.PClearType; -import org.apache.doris.proto.PClearCacheRequest; +import org.apache.doris.proto.PUniqueId; +import org.apache.doris.proto.PUpdateCacheRequest; import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.rpc.RpcException; import org.apache.doris.system.Backend; -import org.apache.doris.common.Status; -import org.apache.doris.proto.PUniqueId; import org.apache.doris.thrift.TNetworkAddress; - import org.apache.doris.thrift.TStatusCode; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import java.util.List; /** * Encapsulates access to BE, including network and other exception handling @@ -66,9 +66,9 @@ public void updateCache(UpdateCacheRequest request, int timeoutMs, Status status status.setStatus(response.status.toString()); } } catch (Exception e) { - LOG.warn("update cache exception, sqlKey {}, e {}", sqlKey, e); + LOG.warn("update cache exception, sqlKey {}", sqlKey, e); status.setRpcStatus(e.getMessage()); - SimpleScheduler.addToBlacklist(backend.getId()); + SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); } } @@ -104,7 +104,7 @@ public FetchCacheResult fetchCache(FetchCacheRequest request, int timeoutMs, Sta } catch (RpcException e) { LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); status.setRpcStatus(e.getMessage()); - SimpleScheduler.addToBlacklist(backend.getId()); + SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage()); } catch (InterruptedException e) { LOG.warn("future get interrupted exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e); status.setStatus("interrupted exception"); @@ -136,8 +136,9 @@ public void clearCache(PClearCacheRequest request, List beList) { } } if (retry >= 3) { - LOG.warn("clear cache timeout, backend {}", backend.getId()); - SimpleScheduler.addToBlacklist(backend.getId()); + String errMsg = "clear cache timeout, backend " + backend.getId(); + LOG.warn(errMsg); + SimpleScheduler.addToBlacklist(backend.getId(), errMsg); } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java index d0342b9032558f..a9279353cc1b79 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java @@ -19,9 +19,11 @@ import mockit.Expectations; import mockit.Mocked; + import org.apache.doris.catalog.Catalog; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Reference; +import org.apache.doris.common.UserException; import org.apache.doris.persist.EditLog; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; @@ -108,18 +110,26 @@ public void testGetHostWithBackendId() { threeBackends.put((long) 2, backendC); ImmutableMap immutableThreeBackends = ImmutableMap.copyOf(threeBackends); - { // null Backends - address = SimpleScheduler.getHost(Long.valueOf(0), nullLocations, - nullBackends, ref); - Assert.assertNull(address); + // null Backends + try { + SimpleScheduler.getHost(Long.valueOf(0), nullLocations, + nullBackends, ref); + Assert.fail(); + } catch (UserException e) { + } - { // empty Backends - address = SimpleScheduler.getHost(Long.valueOf(0), emptyLocations, - emptyBackends, ref); - Assert.assertNull(address); + + // empty Backends + try { + SimpleScheduler.getHost(Long.valueOf(0), emptyLocations, + emptyBackends, ref); + Assert.fail(); + } catch (UserException e) { + } - { // normal Backends + // normal Backends + try { // BackendId exists Assert.assertEquals(SimpleScheduler.getHost(0, emptyLocations, immutableThreeBackends, ref) .hostname, "addressA"); @@ -129,10 +139,17 @@ public void testGetHostWithBackendId() { // BacknedId not exists and location exists, choose the locations's first Assert.assertEquals(SimpleScheduler.getHost(3, twoLocations, immutableThreeBackends, ref) .hostname, "addressA"); + } catch (UserException e) { + Assert.fail(e.getMessage()); } - { // abnormal + + // abnormal + try { // BackendId not exists and location not exists - Assert.assertNull(SimpleScheduler.getHost(3, emptyLocations, immutableThreeBackends, ref)); + SimpleScheduler.getHost(3, emptyLocations, immutableThreeBackends, ref); + Assert.fail(); + } catch (UserException e) { + } } @@ -157,11 +174,23 @@ public void testGetHostWithNoParams() { threeBackends.put((long) 2, backendC); ImmutableMap immutableThreeBackends = ImmutableMap.copyOf(threeBackends); - { // abmormal - Assert.assertNull(SimpleScheduler.getHost(nullBackends, ref)); - Assert.assertNull(SimpleScheduler.getHost(emptyBackends, ref)); - } // normal - { + // abmormal + try { + SimpleScheduler.getHost(nullBackends, ref); + Assert.fail(); + } catch (UserException e) { + + } + + try { + SimpleScheduler.getHost(emptyBackends, ref); + Assert.fail(); + } catch (UserException e) { + + } + + // normal + try { String a = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; String b = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; String c = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; @@ -170,13 +199,15 @@ public void testGetHostWithNoParams() { b = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; c = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; Assert.assertTrue(!a.equals(b) && !a.equals(c) && !b.equals(c)); + } catch (UserException e) { + Assert.fail(e.getMessage()); } } // TODO(lingbin): PALO-2051. // Comment out these code temporatily. // @Test - public void testBlackList() { + public void testBlackList() throws UserException { FeConstants.heartbeat_interval_second = Integer.MAX_VALUE; TNetworkAddress address = null; @@ -192,12 +223,12 @@ public void testBlackList() { threeBackends.put((long) 102, backendC); ImmutableMap immutableThreeBackends = ImmutableMap.copyOf(threeBackends); - SimpleScheduler.addToBlacklist(Long.valueOf(100)); - SimpleScheduler.addToBlacklist(Long.valueOf(101)); + SimpleScheduler.addToBlacklist(Long.valueOf(100), "test"); + SimpleScheduler.addToBlacklist(Long.valueOf(101), "test"); address = SimpleScheduler.getHost(immutableThreeBackends, ref); // only backendc can work Assert.assertEquals(address.hostname, "addressC"); - SimpleScheduler.addToBlacklist(Long.valueOf(102)); + SimpleScheduler.addToBlacklist(Long.valueOf(102), "test"); // no backend can work address = SimpleScheduler.getHost(immutableThreeBackends, ref); Assert.assertNull(address); From b9bc6f29d2717e32fd6e82eaa22a2205beadd1f7 Mon Sep 17 00:00:00 2001 From: morningman Date: Tue, 6 Oct 2020 22:52:20 +0800 Subject: [PATCH 3/3] third --- .../routine_load_task_executor.cpp | 4 +- be/src/util/batch_process_thread_pool.hpp | 2 +- .../org/apache/doris/qe/SimpleScheduler.java | 75 ++--- .../doris/qe/cache/CacheCoordinator.java | 9 +- .../apache/doris/qe/SimpleSchedulerTest.java | 318 ++++++++---------- 5 files changed, 185 insertions(+), 223 deletions(-) diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index 343825153aaaa1..a34c2f9a990884 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -107,7 +107,9 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { // thread pool's queue size > 0 means there are tasks waiting to be executed, so no more tasks should be submitted. if (_thread_pool.get_queue_size() > 0) { - LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id); + LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id) + << ", job id: " << task.job_id + << ", queue size: " << _thread_pool.get_queue_size(); return Status::TooManyTasks(UniqueId(task.id).to_string()); } diff --git a/be/src/util/batch_process_thread_pool.hpp b/be/src/util/batch_process_thread_pool.hpp index e4148b556debae..cead1536c63204 100644 --- a/be/src/util/batch_process_thread_pool.hpp +++ b/be/src/util/batch_process_thread_pool.hpp @@ -129,7 +129,7 @@ class BatchProcessThreadPool { // the first task should blocking, or the tasks queue is empty has_task = _work_queue.blocking_get(&task); } else { - // the 2rd, 3rd... task shoudl non blocking get + // the 2rd, 3rd... task should non blocking get has_task = _work_queue.non_blocking_get(&task); if (!has_task) { break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java index b790a20970dd80..d2c88fe7c59725 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SimpleScheduler.java @@ -42,9 +42,10 @@ import java.util.stream.Collectors; public class SimpleScheduler { - private static AtomicLong nextId = new AtomicLong(0); private static final Logger LOG = LogManager.getLogger(SimpleScheduler.class); + private static AtomicLong nextId = new AtomicLong(0); + // backend id -> (try time, reason) // There will be multi threads to read and modify this map. // But only one thread (UpdateBlacklistThread) will modify the `Pair`. @@ -68,7 +69,7 @@ public static TNetworkAddress getHost(long backendId, LOG.debug("getHost backendID={}, backendSize={}", backendId, backends.size()); Backend backend = backends.get(backendId); - if (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backendId)) { + if (isAvailable(backend)) { backendIdRef.setRef(backendId); return new TNetworkAddress(backend.getHost(), backend.getBePort()); } else { @@ -78,8 +79,7 @@ public static TNetworkAddress getHost(long backendId, } // choose the first alive backend(in analysis stage, the locations are random) Backend candidateBackend = backends.get(location.backend_id); - if (candidateBackend != null && candidateBackend.isAlive() - && !blacklistBackends.containsKey(location.backend_id)) { + if (isAvailable(candidateBackend)) { backendIdRef.setRef(location.backend_id); return new TNetworkAddress(candidateBackend.getHost(), candidateBackend.getBePort()); } @@ -92,26 +92,6 @@ public static TNetworkAddress getHost(long backendId, backends, locations.size())); } - // get the reason why backends can not be chosen. - private static String getBackendErrorMsg(List backendIds, ImmutableMap backends, int limit) { - List res = Lists.newArrayList(); - for (int i = 0; i < backendIds.size() && i < limit; i++) { - long beId = backendIds.get(i); - Backend be = backends.get(beId); - if (be == null) { - res.add(beId + ": not exist"); - } else if (!be.isAlive()) { - res.add(beId + ": not alive"); - } else if (blacklistBackends.containsKey(beId)) { - Pair pair = blacklistBackends.get(beId); - res.add(beId + ": in black list(" + (pair == null ? "unknown" : pair.second) + ")"); - } else { - res.add(beId + ": unknown"); - } - } - return res.toString(); - } - public static TNetworkAddress getHost(ImmutableMap backends, Reference backendIdRef) throws UserException { @@ -126,7 +106,7 @@ public static TNetworkAddress getHost(ImmutableMap backends, Long backendId = idToBackendId.get((int) id); Backend backend = backends.get(backendId); - if (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backendId)) { + if (isAvailable(backend)) { backendIdRef.setRef(backendId); return new TNetworkAddress(backend.getHost(), backend.getBePort()); } else { @@ -142,8 +122,7 @@ public static TNetworkAddress getHost(ImmutableMap backends, Long candidatebackendId = idToBackendId.get((int) candidateId); LOG.debug("candidatebackendId={}", candidatebackendId); Backend candidateBackend = backends.get(candidatebackendId); - if (candidateBackend != null && candidateBackend.isAlive() - && !blacklistBackends.containsKey(candidatebackendId)) { + if (isAvailable(candidateBackend)) { backendIdRef.setRef(candidatebackendId); return new TNetworkAddress(candidateBackend.getHost(), candidateBackend.getBePort()); } @@ -154,6 +133,26 @@ public static TNetworkAddress getHost(ImmutableMap backends, + getBackendErrorMsg(Lists.newArrayList(backends.keySet()), backends, 3)); } + // get the reason why backends can not be chosen. + private static String getBackendErrorMsg(List backendIds, ImmutableMap backends, int limit) { + List res = Lists.newArrayList(); + for (int i = 0; i < backendIds.size() && i < limit; i++) { + long beId = backendIds.get(i); + Backend be = backends.get(beId); + if (be == null) { + res.add(beId + ": not exist"); + } else if (!be.isAlive()) { + res.add(beId + ": not alive"); + } else if (blacklistBackends.containsKey(beId)) { + Pair pair = blacklistBackends.get(beId); + res.add(beId + ": in black list(" + (pair == null ? "unknown" : pair.second) + ")"); + } else { + res.add(beId + ": unknown"); + } + } + return res.toString(); + } + public static void addToBlacklist(Long backendID, String reason) { if (backendID == null) { return; @@ -163,7 +162,7 @@ public static void addToBlacklist(Long backendID, String reason) { LOG.warn("add backend {} to black list. reason: {}", backendID, reason); } - public static boolean isAlive(Backend backend) { + public static boolean isAvailable(Backend backend) { return (backend != null && backend.isAlive() && !blacklistBackends.containsKey(backend.getId())); } @@ -194,22 +193,18 @@ public void run() { Map.Entry> entry = iterator.next(); Long backendId = entry.getKey(); - // remove from blacklist if - // 1. backend does not exist anymore - // 2. backend is alive - if (clusterInfoService.getBackend(backendId) == null - || clusterInfoService.checkBackendAvailable(backendId)) { + // remove from blacklist if backend does not exist anymore + if (clusterInfoService.getBackend(backendId) == null) { iterator.remove(); - LOG.debug("remove backendID {} which is alive", backendId); + LOG.info("remove backend {} from black list because it does not exist", backendId); } else { // 3. max try time is reach - Integer retryTimes = entry.getValue().first; - retryTimes = retryTimes - 1; - if (retryTimes <= 0) { + entry.getValue().first = entry.getValue().first - 1; + if (entry.getValue().first <= 0) { iterator.remove(); - LOG.warn("remove backendID {}. reach max try time", backendId); + LOG.warn("remove backend {} from black list. reach max try time", backendId); } else { - LOG.debug("blacklistBackends backendID={} retryTimes={}", backendId, retryTimes); + LOG.debug("blacklistBackends backendID={} retryTimes={}", backendId, entry.getValue().first); } } } @@ -217,7 +212,7 @@ public void run() { LOG.debug("UpdateBlacklistThread retry end"); } catch (Throwable ex) { - LOG.warn("blacklist thread exception" + ex); + LOG.warn("blacklist thread exception", ex); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java index 0bb157cfd20ac1..97a73012a9c9ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheCoordinator.java @@ -17,22 +17,23 @@ package org.apache.doris.qe.cache; -import com.google.common.collect.Lists; import org.apache.doris.catalog.Catalog; +import org.apache.doris.proto.PUniqueId; import org.apache.doris.qe.SimpleScheduler; import org.apache.doris.system.Backend; -import org.apache.doris.proto.PUniqueId; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.HashMap; import java.util.Hashtable; +import java.util.Iterator; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; -import java.util.Iterator; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -82,7 +83,7 @@ public Backend findBackend(PUniqueId sqlKey) { } Long key = tailMap.firstKey(); virtualNode = tailMap.get(key); - if (SimpleScheduler.isAlive(virtualNode)) { + if (SimpleScheduler.isAvailable(virtualNode)) { break; } else { LOG.debug("backend {} not alive, key {}, retry {}", virtualNode.getId(), key, retryTimes); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java index a9279353cc1b79..09f8a10c79efb7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SimpleSchedulerTest.java @@ -17,220 +17,184 @@ package org.apache.doris.qe; -import mockit.Expectations; -import mockit.Mocked; - -import org.apache.doris.catalog.Catalog; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; -import org.apache.doris.persist.EditLog; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRangeLocation; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.junit.Assert; -import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; public class SimpleSchedulerTest { - static Reference ref = new Reference(); - - @Mocked - private Catalog catalog; - @Mocked - private EditLog editLog; - - @Before - public void setUp() { - new Expectations() { - { - editLog.logAddBackend((Backend) any); - minTimes = 0; - editLog.logDropBackend((Backend) any); - minTimes = 0; + private static Backend be1; + private static Backend be2; + private static Backend be3; + private static Backend be4; + private static Backend be5; + + @BeforeClass + public static void setUp() { + FeConstants.heartbeat_interval_second = 2; + be1 = new Backend(1000L, "192.168.100.0", 9050); + be2 = new Backend(1001L, "192.168.100.1", 9050); + be3 = new Backend(1002L, "192.168.100.2", 9050); + be4 = new Backend(1003L, "192.168.100.3", 9050); + be5 = new Backend(1004L, "192.168.100.4", 9050); + be1.setAlive(true); + be2.setAlive(true); + be3.setAlive(true); + be4.setAlive(true); + be5.setAlive(true); + } - editLog.logBackendStateChange((Backend) any); - minTimes = 0; + private static Map genBackends() { + Map map = Maps.newHashMap(); + map.put(be1.getId(), be1); + map.put(be2.getId(), be2); + map.put(be3.getId(), be3); + map.put(be4.getId(), be4); + map.put(be5.getId(), be5); + return map; + } - catalog.getEditLog(); - minTimes = 0; - result = editLog; + @Test + public void testGetHostNormal() throws UserException, InterruptedException { + Reference ref = new Reference(); + ImmutableMap backends = ImmutableMap.copyOf(genBackends()); + + List locations = Lists.newArrayList(); + TScanRangeLocation scanRangeLocation1 = new TScanRangeLocation(); + scanRangeLocation1.setBackendId(be1.getId()); + locations.add(scanRangeLocation1); + TScanRangeLocation scanRangeLocation2 = new TScanRangeLocation(); + scanRangeLocation2.setBackendId(be2.getId()); + locations.add(scanRangeLocation2); + + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + try { + boolean foundCandidate = false; + long start = System.currentTimeMillis(); + for (int i = 0; i < 1000; i++) { + TNetworkAddress address = SimpleScheduler.getHost(locations.get(0).backend_id, locations, backends, ref); + Assert.assertNotNull(address); + if (!foundCandidate && address.getHostname().equals(be2.getHost())) { + foundCandidate = true; + } + } + System.out.println("cost: " + (System.currentTimeMillis() - start)); + Assert.assertTrue(foundCandidate); + } catch (Exception e) { + throw new RuntimeException(e); + } } - }; - - new Expectations(catalog) { - { - Catalog.getCurrentCatalog(); - minTimes = 0; - result = catalog; + }); + + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + try { + Set resBackends = Sets.newHashSet(); + long start = System.currentTimeMillis(); + for (int i = 0; i < 1000; i++) { + TNetworkAddress address = SimpleScheduler.getHost(backends, ref); + Assert.assertNotNull(address); + resBackends.add(address.hostname); + } + System.out.println("cost: " + (System.currentTimeMillis() - start)); + Assert.assertTrue(resBackends.size() >= 4); + } catch (Exception e) { + throw new RuntimeException(e); + } } - }; - } - - // TODO(lingbin): PALO-2051. - // Comment out these code temporatily. - // @Test - public void testGetHostWithBackendId() { - FeConstants.heartbeat_interval_second = Integer.MAX_VALUE; - TNetworkAddress address; - // three locations - List nullLocations = null; - List emptyLocations = new ArrayList(); - - List twoLocations = new ArrayList(); - TScanRangeLocation locationA = new TScanRangeLocation(); - TScanRangeLocation locationB = new TScanRangeLocation(); - locationA.setBackendId(20); - locationA.setBackendId(30); - twoLocations.add(locationA); - twoLocations.add(locationB); - - // three Backends - ImmutableMap nullBackends = null; - ImmutableMap emptyBackends = ImmutableMap.of(); - - Backend backendA = new Backend(0, "addressA", 0); - backendA.updateOnce(0, 0, 0); - Backend backendB = new Backend(1, "addressB", 0); - backendB.updateOnce(0, 0, 0); - Backend backendC = new Backend(2, "addressC", 0); - backendC.updateOnce(0, 0, 0); - - Map threeBackends = Maps.newHashMap(); - threeBackends.put((long) 0, backendA); - threeBackends.put((long) 1, backendB); - threeBackends.put((long) 2, backendC); - ImmutableMap immutableThreeBackends = ImmutableMap.copyOf(threeBackends); - - // null Backends - try { - SimpleScheduler.getHost(Long.valueOf(0), nullLocations, - nullBackends, ref); - Assert.fail(); - } catch (UserException e) { - - } - - // empty Backends - try { - SimpleScheduler.getHost(Long.valueOf(0), emptyLocations, - emptyBackends, ref); - Assert.fail(); - } catch (UserException e) { + }); - } + Thread t3 = new Thread(new Runnable() { + @Override + public void run() { + SimpleScheduler.addToBlacklist(be1.getId(), "test"); + } + }); - // normal Backends - try { - // BackendId exists - Assert.assertEquals(SimpleScheduler.getHost(0, emptyLocations, immutableThreeBackends, ref) - .hostname, "addressA"); - Assert.assertEquals(SimpleScheduler.getHost(2, emptyLocations, immutableThreeBackends, ref) - .hostname, "addressC"); - - // BacknedId not exists and location exists, choose the locations's first - Assert.assertEquals(SimpleScheduler.getHost(3, twoLocations, immutableThreeBackends, ref) - .hostname, "addressA"); - } catch (UserException e) { - Assert.fail(e.getMessage()); - } + t3.start(); + t1.start(); + t2.start(); - // abnormal - try { - // BackendId not exists and location not exists - SimpleScheduler.getHost(3, emptyLocations, immutableThreeBackends, ref); - Assert.fail(); - } catch (UserException e) { - - } + t1.join(); + t2.join(); + t3.join(); + Assert.assertFalse(SimpleScheduler.isAvailable(be1)); + Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000); + Assert.assertTrue(SimpleScheduler.isAvailable(be1)); } - // TODO(lingbin): PALO-2051. - // Comment out these code temporatily. - // @Test - public void testGetHostWithNoParams() { - FeConstants.heartbeat_interval_second = Integer.MAX_VALUE; - ImmutableMap nullBackends = null; - ImmutableMap emptyBackends = ImmutableMap.of(); - - Backend backendA = new Backend(0, "addressA", 0); - backendA.updateOnce(0, 0, 0); - Backend backendB = new Backend(1, "addressB", 0); - backendB.updateOnce(0, 0, 0); - Backend backendC = new Backend(2, "addressC", 0); - backendC.updateOnce(0, 0, 0); - Map threeBackends = Maps.newHashMap(); - threeBackends.put((long) 0, backendA); - threeBackends.put((long) 1, backendB); - threeBackends.put((long) 2, backendC); - ImmutableMap immutableThreeBackends = ImmutableMap.copyOf(threeBackends); - - // abmormal - try { - SimpleScheduler.getHost(nullBackends, ref); - Assert.fail(); - } catch (UserException e) { + @Test + public void testGetHostAbnormal() throws UserException, InterruptedException { + Reference ref = new Reference(); + ImmutableMap backends = ImmutableMap.copyOf(genBackends()); - } + // 1. unknown backends + List locations = Lists.newArrayList(); + TScanRangeLocation scanRangeLocation1 = new TScanRangeLocation(); + scanRangeLocation1.setBackendId(2000L); + locations.add(scanRangeLocation1); + TScanRangeLocation scanRangeLocation2 = new TScanRangeLocation(); + scanRangeLocation2.setBackendId(2001L); + locations.add(scanRangeLocation2); try { - SimpleScheduler.getHost(emptyBackends, ref); + SimpleScheduler.getHost(locations.get(0).backend_id, locations, backends, ref); Assert.fail(); } catch (UserException e) { - + System.out.println(e.getMessage()); } - // normal + // 2. all backends in black list + locations.clear(); + scanRangeLocation1 = new TScanRangeLocation(); + scanRangeLocation1.setBackendId(be1.getId()); + locations.add(scanRangeLocation1); + scanRangeLocation2 = new TScanRangeLocation(); + scanRangeLocation2.setBackendId(be2.getId()); + locations.add(scanRangeLocation2); + TScanRangeLocation scanRangeLocation3 = new TScanRangeLocation(); + scanRangeLocation3.setBackendId(be3.getId()); + locations.add(scanRangeLocation3); + TScanRangeLocation scanRangeLocation4 = new TScanRangeLocation(); + scanRangeLocation4.setBackendId(be4.getId()); + locations.add(scanRangeLocation4); + TScanRangeLocation scanRangeLocation5 = new TScanRangeLocation(); + scanRangeLocation5.setBackendId(be5.getId()); + locations.add(scanRangeLocation5); + + SimpleScheduler.addToBlacklist(be1.getId(), "test"); + SimpleScheduler.addToBlacklist(be2.getId(), "test"); + SimpleScheduler.addToBlacklist(be3.getId(), "test"); + SimpleScheduler.addToBlacklist(be4.getId(), "test"); + SimpleScheduler.addToBlacklist(be5.getId(), "test"); try { - String a = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; - String b = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; - String c = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; - Assert.assertTrue(!a.equals(b) && !a.equals(c) && !b.equals(c)); - a = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; - b = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; - c = SimpleScheduler.getHost(immutableThreeBackends, ref).hostname; - Assert.assertTrue(!a.equals(b) && !a.equals(c) && !b.equals(c)); + SimpleScheduler.getHost(locations.get(0).backend_id, locations, backends, ref); + Assert.fail(); } catch (UserException e) { - Assert.fail(e.getMessage()); + System.out.println(e.getMessage()); } - } - // TODO(lingbin): PALO-2051. - // Comment out these code temporatily. - // @Test - public void testBlackList() throws UserException { - FeConstants.heartbeat_interval_second = Integer.MAX_VALUE; - TNetworkAddress address = null; - - Backend backendA = new Backend(0, "addressA", 0); - backendA.updateOnce(0, 0, 0); - Backend backendB = new Backend(1, "addressB", 0); - backendB.updateOnce(0, 0, 0); - Backend backendC = new Backend(2, "addressC", 0); - backendC.updateOnce(0, 0, 0); - Map threeBackends = Maps.newHashMap(); - threeBackends.put((long) 100, backendA); - threeBackends.put((long) 101, backendB); - threeBackends.put((long) 102, backendC); - ImmutableMap immutableThreeBackends = ImmutableMap.copyOf(threeBackends); - - SimpleScheduler.addToBlacklist(Long.valueOf(100), "test"); - SimpleScheduler.addToBlacklist(Long.valueOf(101), "test"); - address = SimpleScheduler.getHost(immutableThreeBackends, ref); - // only backendc can work - Assert.assertEquals(address.hostname, "addressC"); - SimpleScheduler.addToBlacklist(Long.valueOf(102), "test"); - // no backend can work - address = SimpleScheduler.getHost(immutableThreeBackends, ref); - Assert.assertNull(address); + Thread.sleep((FeConstants.heartbeat_interval_second + 5) * 1000); + Assert.assertNotNull(SimpleScheduler.getHost(locations.get(0).backend_id, locations, backends, ref)); } }