From b270da376d1bbf9e4b630dddae61dc55100f60bd Mon Sep 17 00:00:00 2001 From: JIANdan1 Date: Wed, 8 Nov 2023 14:34:56 +0800 Subject: [PATCH] algorithm init --- pom.xml | 101 ++++++++ .../base/RoundRobinLoadBalancer.java | 38 +++ .../edu/algorithm/entity/AdaptiveMetrics.java | 110 ++++++++ .../entity/ConcurrentHashMapUtils.java | 42 +++ .../java/edu/algorithm/entity/Invocation.java | 39 +++ .../algorithm/entity/NamedThreadFactory.java | 39 +++ .../java/edu/algorithm/entity/RpcStatus.java | 163 ++++++++++++ .../java/edu/algorithm/entity/RpcUtils.java | 4 + .../java/edu/algorithm/entity/Server.java | 74 ++++++ .../loadbalance/AdaptiveLoadBalance.java | 162 ++++++++++++ .../loadbalance/AdaptivePowerOfTwoChoice.java | 188 ++++++++++++++ .../ConsistentHashLoadBalance.java | 240 ++++++++++++++++++ .../loadbalance/LeastActiveLoadBalance.java | 126 +++++++++ .../loadbalance/RandomLoadBalance.java | 73 ++++++ .../loadbalance/RoundRobinLoadBalance.java | 122 +++++++++ .../ShortestResponseLoadBalance.java | 170 +++++++++++++ .../edu/algorithm/untils/FastJsonUtil.java | 11 + 17 files changed, 1702 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/edu/algorithm/base/RoundRobinLoadBalancer.java create mode 100644 src/main/java/edu/algorithm/entity/AdaptiveMetrics.java create mode 100644 src/main/java/edu/algorithm/entity/ConcurrentHashMapUtils.java create mode 100644 src/main/java/edu/algorithm/entity/Invocation.java create mode 100644 src/main/java/edu/algorithm/entity/NamedThreadFactory.java create mode 100644 src/main/java/edu/algorithm/entity/RpcStatus.java create mode 100644 src/main/java/edu/algorithm/entity/RpcUtils.java create mode 100644 src/main/java/edu/algorithm/entity/Server.java create mode 100644 src/main/java/edu/algorithm/loadbalance/AdaptiveLoadBalance.java create mode 100644 src/main/java/edu/algorithm/loadbalance/AdaptivePowerOfTwoChoice.java create mode 100644 src/main/java/edu/algorithm/loadbalance/ConsistentHashLoadBalance.java create mode 100644 src/main/java/edu/algorithm/loadbalance/LeastActiveLoadBalance.java create mode 100644 src/main/java/edu/algorithm/loadbalance/RandomLoadBalance.java create mode 100644 src/main/java/edu/algorithm/loadbalance/RoundRobinLoadBalance.java create mode 100644 src/main/java/edu/algorithm/loadbalance/ShortestResponseLoadBalance.java create mode 100644 src/main/java/edu/algorithm/untils/FastJsonUtil.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..0d4e70e --- /dev/null +++ b/pom.xml @@ -0,0 +1,101 @@ + + + 4.0.0 + + org.example + algorithm-course + 1.0-SNAPSHOT + + + 8 + 8 + UTF-8 + + 1.8 + 1.8 + 3.2.0 + 4.12 + 5.2.8.RELEASE + 3.7.0 + + + + + org.projectlombok + lombok + 1.18.12 + + + + org.apache.dubbo + dubbo + + + + org.apache.dubbo + dubbo-dependencies-zookeeper + pom + + + + junit + junit + test + + + + org.springframework + spring-test + test + + + + com.alibaba + fastjson + 1.2.73 + + + org.junit.jupiter + junit-jupiter + RELEASE + compile + + + + + + + + org.springframework + spring-framework-bom + ${spring.version} + pom + import + + + + org.apache.dubbo + dubbo-bom + ${dubbo.version} + pom + import + + + + org.apache.dubbo + dubbo-dependencies-zookeeper + ${dubbo.version} + pom + + + + junit + junit + ${junit.version} + + + + + \ No newline at end of file diff --git a/src/main/java/edu/algorithm/base/RoundRobinLoadBalancer.java b/src/main/java/edu/algorithm/base/RoundRobinLoadBalancer.java new file mode 100644 index 0000000..56eebe3 --- /dev/null +++ b/src/main/java/edu/algorithm/base/RoundRobinLoadBalancer.java @@ -0,0 +1,38 @@ +package edu.algorithm.base; + +import java.util.ArrayList; +import java.util.List; + +public class RoundRobinLoadBalancer { + private List serverList; + private int currentIndex; + + public RoundRobinLoadBalancer(List serverList) { + this.serverList = serverList; + this.currentIndex = 0; + } + + public String getNextServer() { + String server = serverList.get(currentIndex); + currentIndex = (currentIndex + 1) % serverList.size(); + return server; + } + + // 示例使用 + public static void main(String[] args) { + // 创建服务器列表 + List serverList = new ArrayList<>(); + serverList.add("Server1"); + serverList.add("Server2"); + serverList.add("Server3"); + + // 创建轮询负载均衡器实例 + RoundRobinLoadBalancer loadBalancer = new RoundRobinLoadBalancer(serverList); + + // 模拟请求分发 + for (int i = 0; i < 10; i++) { + String server = loadBalancer.getNextServer(); + System.out.println("Request " + (i + 1) + " sent to server: " + server); + } + } +} \ No newline at end of file diff --git a/src/main/java/edu/algorithm/entity/AdaptiveMetrics.java b/src/main/java/edu/algorithm/entity/AdaptiveMetrics.java new file mode 100644 index 0000000..b839fda --- /dev/null +++ b/src/main/java/edu/algorithm/entity/AdaptiveMetrics.java @@ -0,0 +1,110 @@ +package edu.algorithm.entity; + +import org.apache.dubbo.common.utils.StringUtils; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +public class AdaptiveMetrics { + + public final ConcurrentMap metricsStatistics = new ConcurrentHashMap<>(); + // + public long currentProviderTime = 0; + // 是在 ProfilerServerFilter 的 onResponse 方法中经过计算得到的 cpu load 依赖其他组件计算出来的 CPU负载 + public double providerCPULoad = 0; + // 是在 setProviderMetrics 方法里面维护的,其中 lastLatency 是在 ProfilerServerFilter 的 onResponse 方法中经过计算得到的 rt 值 + // /'leɪtənsɪ/ + public long lastLatency = 0; + // 当前时间 + public long currentTime = 0; + + //Allow some time disorder + public long pickTime = System.currentTimeMillis(); + + public double beta = 0.5; + // 是总的调用次数 + public final AtomicLong consumerReq = new AtomicLong(); + // 是在每次调用成功后在 AdaptiveLoadBalanceFilter 的 onResponse 方法中维护的值。 + public final AtomicLong consumerSuccess = new AtomicLong(); + // 每次出现调用异常时,维护的值 + public final AtomicLong errorReq = new AtomicLong(); + // 这是一个公式算出来的值 Vt = β * Vt-1 + (1 - β ) * θt + // 指数加权移动平均值的控制图 (exponentially weighted moving average) , EWMA主要用于对网络的状态参数进行估计和平滑 , 负责得到 当前服务器的“平滑负载指标” + // 有两个特点,1.不需要保存过去所有数值,2.计算量显著减少 + public double ewma = 0; + + public double getLoad(String idKey, int weight, int timeout) { + AdaptiveMetrics metrics = getStatus(idKey); + + //If the time more than 2 times, mandatory selected 如果超时时间超过 2次,则强制选择 + if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) { + return 0; + } + + if (metrics.currentTime > 0) { + long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1; + if (multiple > 0) { + if (metrics.currentProviderTime == metrics.currentTime) { + //penalty value + metrics.lastLatency = timeout * 2L; + } else { + metrics.lastLatency = metrics.lastLatency >> multiple; + } + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + metrics.currentTime = System.currentTimeMillis(); + } + } + + long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get(); + // Vt = β * Vt-1 + (1 - β ) * θt + // 服务器的CPU负载 乘以 ( (响应时间) * 当前正在处理的请求数 + 1) / ( 请求成功次数 ) / (( 请求次数 +1)*权重 +1) + return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double) metrics.consumerSuccess.get() / (double) (metrics.consumerReq.get() + 1)) * weight) + 1); + } + + public AdaptiveMetrics getStatus(String idKey) { + return ConcurrentHashMapUtils.computeIfAbsent(metricsStatistics, idKey, k -> new AdaptiveMetrics()); + } + + public void addConsumerReq(String idKey) { + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerReq.incrementAndGet(); + } + + public void addConsumerSuccess(String idKey) { + AdaptiveMetrics metrics = getStatus(idKey); + metrics.consumerSuccess.incrementAndGet(); + } + + public void addErrorReq(String idKey) { + AdaptiveMetrics metrics = getStatus(idKey); + metrics.errorReq.incrementAndGet(); + } + + public void setPickTime(String idKey, long time) { + AdaptiveMetrics metrics = getStatus(idKey); + metrics.pickTime = time; + } + + + public void setProviderMetrics(String idKey, Map metricsMap) { + + AdaptiveMetrics metrics = getStatus(idKey); + + long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v, false)).orElse("0")); + //If server time is less than the current time, discard + if (metrics.currentProviderTime > serviceTime) { + return; + } + + metrics.currentProviderTime = serviceTime; + metrics.currentTime = serviceTime; + metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v, true)).orElse("0")); + metrics.lastLatency = Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> StringUtils.isNumeric(v, false)).orElse("0"))); + + metrics.beta = 0.5; + metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency; + } +} \ No newline at end of file diff --git a/src/main/java/edu/algorithm/entity/ConcurrentHashMapUtils.java b/src/main/java/edu/algorithm/entity/ConcurrentHashMapUtils.java new file mode 100644 index 0000000..c78489f --- /dev/null +++ b/src/main/java/edu/algorithm/entity/ConcurrentHashMapUtils.java @@ -0,0 +1,42 @@ +package edu.algorithm.entity; + +import org.junit.jupiter.api.condition.JRE; + +import java.util.Objects; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +public class ConcurrentHashMapUtils { + + /** + * A temporary workaround for Java 8 ConcurrentHashMap#computeIfAbsent specific performance issue: JDK-8161372.
+ * @see https://bugs.openjdk.java.net/browse/JDK-8161372 + * + */ + public static V computeIfAbsent(ConcurrentMap map, K key, Function func) { + Objects.requireNonNull(func); + if (JRE.JAVA_8.isCurrentVersion()) { + V v = map.get(key); + if (null == v) { + // issue#11986 lock bug + // v = map.computeIfAbsent(key, func); + + // this bug fix methods maybe cause `func.apply` multiple calls. + v = func.apply(key); + if(null == v){ + return null; + } + final V res = map.putIfAbsent(key, v); + if(null != res){ + // if pre value present, means other thread put value already, and putIfAbsent not effect + // return exist value + return res; + } + // if pre value is null, means putIfAbsent effected, return current value + } + return v; + } else { + return map.computeIfAbsent(key, func); + } + } + +} diff --git a/src/main/java/edu/algorithm/entity/Invocation.java b/src/main/java/edu/algorithm/entity/Invocation.java new file mode 100644 index 0000000..b7d1d16 --- /dev/null +++ b/src/main/java/edu/algorithm/entity/Invocation.java @@ -0,0 +1,39 @@ +package edu.algorithm.entity; + + +import lombok.Data; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@Data +public class Invocation { + + private static final long serialVersionUID = -4355285085441097045L; + private String methodName; + private Class[] parameterTypes; + private Object[] arguments; + private Map attachments; + + private transient Map attributes = Collections.synchronizedMap(new HashMap<>()); + + public void setAttachment(String key, Object value) { + setObjectAttachment(key, value); + } + + public Map getAttributes() { + return attributes; + } + public void setObjectAttachment(String key, Object value) { + try { + + if (attachments == null) { + attachments = new HashMap<>(); + } + attachments.put(key, value); + } finally { + + } + } +} diff --git a/src/main/java/edu/algorithm/entity/NamedThreadFactory.java b/src/main/java/edu/algorithm/entity/NamedThreadFactory.java new file mode 100644 index 0000000..638253b --- /dev/null +++ b/src/main/java/edu/algorithm/entity/NamedThreadFactory.java @@ -0,0 +1,39 @@ +package edu.algorithm.entity; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class NamedThreadFactory implements ThreadFactory { + protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1); + protected final AtomicInteger mThreadNum; + protected final String mPrefix; + protected final boolean mDaemon; + protected final ThreadGroup mGroup; + + public NamedThreadFactory() { + this("pool-" + POOL_SEQ.getAndIncrement(), false); + } + + public NamedThreadFactory(String prefix) { + this(prefix, false); + } + + public NamedThreadFactory(String prefix, boolean daemon) { + this.mThreadNum = new AtomicInteger(1); + this.mPrefix = prefix + "-thread-"; + this.mDaemon = daemon; + SecurityManager s = System.getSecurityManager(); + this.mGroup = s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); + } + + public Thread newThread(Runnable runnable) { + String name = this.mPrefix + this.mThreadNum.getAndIncrement(); + Thread ret = new Thread(this.mGroup, runnable, name, 0L); + ret.setDaemon(this.mDaemon); + return ret; + } + + public ThreadGroup getThreadGroup() { + return this.mGroup; + } +} diff --git a/src/main/java/edu/algorithm/entity/RpcStatus.java b/src/main/java/edu/algorithm/entity/RpcStatus.java new file mode 100644 index 0000000..d9817ef --- /dev/null +++ b/src/main/java/edu/algorithm/entity/RpcStatus.java @@ -0,0 +1,163 @@ +package edu.algorithm.entity; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class RpcStatus { + // 用于保存所有服务实例的 + public static final ConcurrentMap SERVICE_STATISTICS = new ConcurrentHashMap(); + + public final ConcurrentMap values = new ConcurrentHashMap(); + // 每个服务器的最少活跃连接数 + public final AtomicInteger active = new AtomicInteger(); + public final AtomicLong total = new AtomicLong(); + public final AtomicInteger failed = new AtomicInteger(); + public final AtomicLong totalElapsed = new AtomicLong(); + public final AtomicLong failedElapsed = new AtomicLong(); + public final AtomicLong maxElapsed = new AtomicLong(); + public final AtomicLong failedMaxElapsed = new AtomicLong(); + public final AtomicLong succeededMaxElapsed = new AtomicLong(); + + + private RpcStatus() { + } + + public static RpcStatus getStatus(String uri) { + return (RpcStatus)SERVICE_STATISTICS.computeIfAbsent(uri, (key) -> { + return new RpcStatus(); + }); + } + + public static void removeStatus(String uri) { + SERVICE_STATISTICS.remove(uri); + } + + public static void beginCount(String uri ) { + beginCount(uri, Integer.MAX_VALUE); + } + // 请求处理开始,记录每个服务器活跃连接数 + public static boolean beginCount(String url, int max) { + max = max <= 0 ? Integer.MAX_VALUE : max; + RpcStatus appStatus = getStatus(url); + if (appStatus.active.get() == Integer.MAX_VALUE) { + return false; + } else { + int i; + do { + i = appStatus.active.get(); + if (i == Integer.MAX_VALUE || i + 1 > max) { + return false; + } + } while(!appStatus.active.compareAndSet(i, i + 1)); + + appStatus.active.incrementAndGet(); + return true; + } + } + // 请求处理完成后,更新选择服务的活跃连接数 + public static void endCount(RpcStatus status, long elapsed, boolean succeeded) { + status.active.decrementAndGet(); + status.total.incrementAndGet(); + status.totalElapsed.addAndGet(elapsed); + if (status.maxElapsed.get() < elapsed) { + status.maxElapsed.set(elapsed); + } + + if (succeeded) { + if (status.succeededMaxElapsed.get() < elapsed) { + status.succeededMaxElapsed.set(elapsed); + } + } else { + status.failed.incrementAndGet(); + status.failedElapsed.addAndGet(elapsed); + if (status.failedMaxElapsed.get() < elapsed) { + status.failedMaxElapsed.set(elapsed); + } + } + } + + public int getActive() { + return this.active.get(); + } + + public void set(String key, Object value) { + this.values.put(key, value); + } + + public Object get(String key) { + return this.values.get(key); + } + + + public long getTotal() { + return this.total.longValue(); + } + + public long getTotalElapsed() { + return this.totalElapsed.get(); + } + + public long getAverageElapsed() { + long total = this.getTotal(); + return total == 0L ? 0L : this.getTotalElapsed() / total; + } + + public long getMaxElapsed() { + return this.maxElapsed.get(); + } + + public int getFailed() { + return this.failed.get(); + } + + public long getFailedElapsed() { + return this.failedElapsed.get(); + } + + public long getFailedAverageElapsed() { + long failed = (long)this.getFailed(); + return failed == 0L ? 0L : this.getFailedElapsed() / failed; + } + + public long getFailedMaxElapsed() { + return this.failedMaxElapsed.get(); + } + + public long getSucceeded() { + return this.getTotal() - (long)this.getFailed(); + } + + public long getSucceededElapsed() { + return this.getTotalElapsed() - this.getFailedElapsed(); + } + + public long getSucceededAverageElapsed() { + long succeeded = this.getSucceeded(); + return succeeded == 0L ? 0L : this.getSucceededElapsed() / succeeded; + } + + public long getSucceededMaxElapsed() { + return this.succeededMaxElapsed.get(); + } + + public long getAverageTps() { + return this.getTotalElapsed() >= 1000L ? this.getTotal() / (this.getTotalElapsed() / 1000L) : this.getTotal(); + } + + @Override + public String toString() { + return "RpcStatus{" + + "values=" + values + + ", active=" + active + + ", total=" + total + + ", failed=" + failed + + ", totalElapsed=" + totalElapsed + + ", failedElapsed=" + failedElapsed + + ", maxElapsed=" + maxElapsed + + ", failedMaxElapsed=" + failedMaxElapsed + + ", succeededMaxElapsed=" + succeededMaxElapsed + + '}'; + } +} diff --git a/src/main/java/edu/algorithm/entity/RpcUtils.java b/src/main/java/edu/algorithm/entity/RpcUtils.java new file mode 100644 index 0000000..578eff6 --- /dev/null +++ b/src/main/java/edu/algorithm/entity/RpcUtils.java @@ -0,0 +1,4 @@ +package edu.algorithm.entity; + +public class RpcUtils { +} diff --git a/src/main/java/edu/algorithm/entity/Server.java b/src/main/java/edu/algorithm/entity/Server.java new file mode 100644 index 0000000..41ed22a --- /dev/null +++ b/src/main/java/edu/algorithm/entity/Server.java @@ -0,0 +1,74 @@ +package edu.algorithm.entity; + +import lombok.Data; + +@Data +public class Server { + // 服务器 id + private int id; + // 服务器 名称 + private String name; + // 服务器 端口号 + private String port; + // 服务器 ip地址 + private String ip; + // 权重 + private int weight; + // 消耗的时间(响应时间) + private int elapsed; + + private int replicaNumber; + + private String hashArguments; + + private int timeOut; + + public Server(int id, String name, String port, String ip, int weight) { + this.id = id; + this.name = name; + this.port = port; + this.ip = ip; + this.weight = weight; + } + public Server(int id, String name, String port, String ip, int weight,int elapsed) { + this.id = id; + this.name = name; + this.port = port; + this.ip = ip; + this.weight = weight; + this.elapsed = elapsed; + } + public Server(int id, String name, String port, String ip, int weight,int elapsed,int replicaNumber,String hashArguments) { + this.id = id; + this.name = name; + this.port = port; + this.ip = ip; + this.weight = weight; + this.elapsed = elapsed; + this.replicaNumber = replicaNumber; + this.hashArguments = hashArguments; + } + + public Server(int id, String name, String port, String ip, int weight,int elapsed,int replicaNumber,String hashArguments,int timeOut) { + this.id = id; + this.name = name; + this.port = port; + this.ip = ip; + this.weight = weight; + this.elapsed = elapsed; + this.replicaNumber = replicaNumber; + this.hashArguments = hashArguments; + this.timeOut = timeOut; + } + + @Override + public String toString() { + return "Server{" + + "id=" + id + + ", name='" + name + '\'' + + ", port='" + port + '\'' + + ", ip='" + ip + '\'' + + ", weight=" + weight + + '}'; + } +} diff --git a/src/main/java/edu/algorithm/loadbalance/AdaptiveLoadBalance.java b/src/main/java/edu/algorithm/loadbalance/AdaptiveLoadBalance.java new file mode 100644 index 0000000..63815ca --- /dev/null +++ b/src/main/java/edu/algorithm/loadbalance/AdaptiveLoadBalance.java @@ -0,0 +1,162 @@ +package edu.algorithm.loadbalance; + +import com.alibaba.dubbo.rpc.RpcInvocation; +import edu.algorithm.entity.AdaptiveMetrics; + +import edu.algorithm.entity.Invocation; +import edu.algorithm.entity.Server; +import org.apache.dubbo.common.constants.LoadbalanceRules; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.rpc.Constants; + + + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.dubbo.common.constants.CommonConstants.*; + +public class AdaptiveLoadBalance { + + public static final String NAME = "adaptive"; + + // default key + private String attachmentKey = "mem,load"; + + private final AdaptiveMetrics adaptiveMetrics; + + private int DEFAULT_TIMEOUT = 1000; + + public AdaptiveLoadBalance(){ + adaptiveMetrics = new AdaptiveMetrics(); + } + + protected Server doSelect(List invokers, Invocation invocation) { + // 通过P2C 算法 选择一个服务器 + Server invoker = selectByP2C(invokers,invocation); + // 设置计算负载的参数 + invocation.setAttachment(Constants.ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY,attachmentKey); + // 请求开始处理,记录请求开始处理时间 + long startTime = System.currentTimeMillis(); + // 通过 invocation 传递开始时间,用于计算负载和吞吐量 + invocation.getAttributes().put(Constants.ADAPTIVE_LOADBALANCE_START_TIME,startTime); + // 设置负载均衡器的 自适应算法 + invocation.getAttributes().put(LOADBALANCE_KEY, LoadbalanceRules.ADAPTIVE); + // 保存消费者 + adaptiveMetrics.addConsumerReq(getServiceKey(invoker,invocation)); + // 保存选择的开始时间 + adaptiveMetrics.setPickTime(getServiceKey(invoker,invocation),startTime); + + return invoker; + } + + /** + * 根据服务列列表 随机选择两个服务器,然后对比负载情况,选择负载较轻的一台 + * @param invokers + * @param invocation + * @return + */ + private Server selectByP2C(List invokers, Invocation invocation){ + int length = invokers.size(); + if(length == 1) { + return invokers.get(0); + } + + if(length == 2) { + return chooseLowLoadInvoker(invokers.get(0),invokers.get(1),invocation); + } + // 随机选择两个服务器 + int pos1 = ThreadLocalRandom.current().nextInt(length); + int pos2 = ThreadLocalRandom.current().nextInt(length - 1); + if (pos2 >= pos1) { + pos2 = pos2 + 1; + } + // 动态计算两个服务器的负载情况,选择负载较轻的服务器 + return chooseLowLoadInvoker(invokers.get(pos1),invokers.get(pos2),invocation); + } + + private String getServiceKey(Server invoker,Invocation invocation){ + + String key = (String) invocation.getAttributes().get(invoker); + if (StringUtils.isNotEmpty(key)){ + return key; + } + + key = buildServiceKey(invoker,invocation); + invocation.getAttributes().put(invoker,key); + return key; + } + + private String buildServiceKey(Server invoker,Invocation invocation){ +// URL url = invoker.getUrl(); +// StringBuilder sb = new StringBuilder(128); +// sb.append(url.getAddress()).append(":").append(invocation.getProtocolServiceKey()); + // 目的是拿到一个 服务器的key ,这里默认拿ip地址 + return invoker.getIp(); + } + + private int getTimeout(Server invoker, Invocation invocation) { +// URL url = invoker.getUrl(); +// String methodName = RpcUtils.getMethodName(invocation); +// return (int) RpcUtils.getTimeout(url,methodName, RpcContext.getClientAttachment(),invocation, DEFAULT_TIMEOUT); +// + // 目的是拿到一个 超时时间, 这里默认拿服务器的默认配置 + return invoker.getTimeOut(); + } + + private Server chooseLowLoadInvoker(Server invoker1,Server invoker2,Invocation invocation){ + int weight1 = invoker1.getWeight(); + int weight2 = invoker2.getWeight(); + int timeout1 = getTimeout(invoker1, invocation); + int timeout2 = getTimeout(invoker2, invocation); + long load1 = Double.doubleToLongBits(adaptiveMetrics.getLoad(getServiceKey(invoker1,invocation),weight1,timeout1 )); + long load2 = Double.doubleToLongBits(adaptiveMetrics.getLoad(getServiceKey(invoker2,invocation),weight2,timeout2 )); + + // 负载相同的情况下 + if (load1 == load2) { + // The sum of weights + int totalWeight = weight1 + weight2; + if (totalWeight > 0) { + // 根据权重随机选择 + int offset = ThreadLocalRandom.current().nextInt(totalWeight); + if (offset < weight1) { + return invoker1; + } + return invoker2; + } + // 默认权重为0时 随机选择 + return ThreadLocalRandom.current().nextInt(2) == 0 ? invoker1 : invoker2; + } + // 根据负载情况选择服务器,负载低的 被选择。 + return load1 > load2 ? invoker2 : invoker1; + } + + public static void main(String[] args) { + List serverList = new ArrayList<>(); + serverList.add(new Server(1,"服务器1","8080","127.0.0.1",90,0,0,"",2000)); + serverList.add(new Server(2,"服务器2","8090","127.0.0.2",80,0,0,"",3000)); + serverList.add(new Server(3,"服务器3","8088","127.0.0.3",70,0,0,"",3000)); + serverList.add(new Server(4,"服务器4","8099","127.0.0.4",70,0,0,"",3000)); + serverList.add(new Server(5,"服务器5","8070","127.0.0.5",80,0,0,"",3000)); + serverList.add(new Server(6,"服务器6","8060","127.0.0.6",70,0,0,"",4000)); + serverList.add(new Server(7,"服务器7","8050","127.0.0.7",80,0,0,"",5000)); + + AdaptiveLoadBalance adaptiveLoadBalance = new AdaptiveLoadBalance(); + + + Invocation invocation = new Invocation(); + for (int i = 0; i < 1000; i++) { + // 负载均衡策略的执行,即是在所有的Provider中选出一个,作为当前Consumer的远程调用对象 + System.out.println(adaptiveLoadBalance.doSelect(serverList,invocation).toString()); + } + Iterator iterator = adaptiveLoadBalance.adaptiveMetrics.metricsStatistics.entrySet().iterator(); + while (iterator.hasNext()){ + Map.Entry entry = (Map.Entry) iterator.next(); + System.out.println("Key: "+entry.getKey()+" consumerReq: "+entry.getValue().consumerReq ); + } + } + +} \ No newline at end of file diff --git a/src/main/java/edu/algorithm/loadbalance/AdaptivePowerOfTwoChoice.java b/src/main/java/edu/algorithm/loadbalance/AdaptivePowerOfTwoChoice.java new file mode 100644 index 0000000..7c915b2 --- /dev/null +++ b/src/main/java/edu/algorithm/loadbalance/AdaptivePowerOfTwoChoice.java @@ -0,0 +1,188 @@ +//package edu.algorithm.loadbalance; +// +//import edu.algorithm.entity.Invocation; +//import edu.algorithm.entity.NamedThreadFactory; +//import edu.algorithm.entity.RpcStatus; +//import edu.algorithm.entity.Server; +// +//import java.util.ArrayList; +//import java.util.Iterator; +//import java.util.List; +//import java.util.Map; +//import java.util.concurrent.ConcurrentHashMap; +//import java.util.concurrent.ExecutorService; +//import java.util.concurrent.Executors; +//import java.util.concurrent.ThreadLocalRandom; +//import java.util.concurrent.atomic.AtomicBoolean; +// +//public class AdaptivePowerOfTwoChoice { +// +// public static final String NAME = "adaptiveP2C"; +// +// private int slidePeriod = 30000; +// +// private ExecutorService executorService = Executors +// .newCachedThreadPool(new NamedThreadFactory("Dubbo-framework-shared-handler-adaptiveP2C", true)); +// +// private volatile long lastUpdateTime = System.currentTimeMillis(); +// +// private AtomicBoolean onResetSlideWindow = new AtomicBoolean(false); +// +// private ConcurrentHashMap methodMap = new ConcurrentHashMap<>(); +// +// +// protected static class SlideWindowData{ +// +// private RpcStatus rpcStatus; +// +// private long totalOffset; +// +// private long succeedOffset; +// +// private long totalElapsedOffset; +// +// private long succeedElapsedOffset; +// +// public SlideWindowData(RpcStatus rpcStatus){ +// this.rpcStatus = rpcStatus; +// this.totalOffset = 0; +// this.totalElapsedOffset = 0; +// this.succeedElapsedOffset = 0; +// this.succeedOffset = 0; +// } +// +// public void reset(){ +// this.totalOffset = rpcStatus.getTotal(); +// this.succeedOffset = rpcStatus.getSucceeded(); +// this.totalElapsedOffset = rpcStatus.getTotalElapsed(); +// this.succeedElapsedOffset = rpcStatus.getSucceededElapsed(); +// } +// +// public double getLatency(){ +// if((this.rpcStatus.getSucceeded() - this.succeedOffset) == 0){ +// return 0; +// } +// return (double) (this.rpcStatus.getSucceededElapsed() - this.succeedElapsedOffset)/(this.rpcStatus.getSucceeded() - this.succeedOffset); +// } +// +// public long getAccept(){ +// return (this.rpcStatus.getSucceeded() - this.succeedOffset); +// } +// +// private long getRequest(){ +// return (this.rpcStatus.getTotal() - this.totalOffset); +// } +// +// } +// +// private double getNormlize(double x){ +// return x/(1 + x); +// } +// +// public double getWeight(Server server, Invocation invocation, double averageLatency){ +// // 用 IP地址存储 每个服务的 访问次数和统计情况 +// RpcStatus rpcStatus = RpcStatus.getStatus(server.getIp()); +// SlideWindowData slideWindowData = methodMap.get(rpcStatus); +// double latency = (1 + averageLatency)/(1 + slideWindowData.getLatency()); +// +// return (1 + slideWindowData.getAccept())/(1 + slideWindowData.getRequest()) * getNormlize(latency); +// } +// +// private Server leastWeight(Server invoker1,Server invoker2,Invocation invocation,double averageLatency){ +// double weight1 = getWeight(invoker1,invocation,averageLatency); +// double weight2 = getWeight(invoker2,invocation,averageLatency); +// if(weight1 >= weight2) +// return invoker1; +// return invoker2; +// } +// +// public double getAverageLatency(List invokers){ +// double averageLatency = 0; +// double totalLatency = 0; +// int length = invokers.size(); +// for(int i = 0; i < length; i++){ +// RpcStatus rpcStatus = RpcStatus.getStatus(invokers.get(i).getIp()); +// SlideWindowData slideWindowData = methodMap.computeIfAbsent(rpcStatus,SlideWindowData::new); +// totalLatency += slideWindowData.getLatency(); +// } +// averageLatency = totalLatency/length; +// return averageLatency; +// } +// +// +// protected Server select(List invokers, Invocation invocation) { +// // 获取服务实例 数组长度 +// int length = invokers.size(); +// if(length == 1) +// return invokers.get(0); +// +// double averageLatency = 0; +// double totalLatency = 0; +// +// while(!onResetSlideWindow.compareAndSet(false,true)){} +// +// // 为每个服务实例都创建一RpcStatus 窗口,记录 +// for(int i = 0; i < length; i++){ +// RpcStatus rpcStatus = RpcStatus.getStatus(invokers.get(i).getIp()); +// SlideWindowData slideWindowData = methodMap.computeIfAbsent(rpcStatus,SlideWindowData::new); +// totalLatency += slideWindowData.getLatency(); +// } +// onResetSlideWindow.set(false); +// +// averageLatency = totalLatency/length; +// +// if(length == 2) +// return leastWeight(invokers.get(0),invokers.get(1),invocation,averageLatency); +// +// int pos1 = ThreadLocalRandom.current().nextInt(length); +// int pos2 = ThreadLocalRandom.current().nextInt(length - 1); +// +// if(pos2 >= pos1){ +// pos2 = pos2 + 1; +// } +// +// Server result = leastWeight(invokers.get(pos1),invokers.get(pos2),invocation,averageLatency); +// +// if(System.currentTimeMillis() - lastUpdateTime > slidePeriod && onResetSlideWindow.compareAndSet(false,true)){ +// executorService.execute(() -> { +// methodMap.values().forEach(SlideWindowData::reset); +// lastUpdateTime = System.currentTimeMillis(); +// onResetSlideWindow.set(false); +// }); +// } +// return result; +// } +// +// +// public static void main(String[] args) { +// List serverList = new ArrayList<>(); +// serverList.add(new Server(1,"服务器1","8080","127.0.0.1",90,20,0,"")); +// serverList.add(new Server(2,"服务器2","8090","127.0.0.2",90,20,0,"")); +// serverList.add(new Server(3,"服务器3","8088","127.0.0.3",80,30,0,"")); +// serverList.add(new Server(4,"服务器4","8099","127.0.0.4",80,30,0,"")); +// serverList.add(new Server(5,"服务器5","8070","127.0.0.5",70,35,0,"")); +// serverList.add(new Server(6,"服务器6","8060","127.0.0.6",88,28,0,"")); +// serverList.add(new Server(7,"服务器7","8050","127.0.0.7",78,38,0,"")); +// +// +// +// AdaptivePowerOfTwoChoice powerOfTwoChoice = new AdaptivePowerOfTwoChoice(); +// Invocation invocation = new Invocation(); +// invocation.setArguments(new String[]{"454545"}); +// +// for (int i = 0; i < 1000; i++) { +// Server server = powerOfTwoChoice.select(serverList,invocation); +// System.out.println(server.toString()); +// +// RpcStatus.beginCount(server.getIp()); +// } +// +// Iterator iterator = RpcStatus.SERVICE_STATISTICS.entrySet().iterator(); +// while (iterator.hasNext()){ +// Map.Entry entry = (Map.Entry) iterator.next(); +// System.out.println("Key: "+entry.getKey()+" Value: "+entry.getValue().toString() + " avg:" + entry.getValue().getAverageElapsed()); +// } +// +// } +// +//} diff --git a/src/main/java/edu/algorithm/loadbalance/ConsistentHashLoadBalance.java b/src/main/java/edu/algorithm/loadbalance/ConsistentHashLoadBalance.java new file mode 100644 index 0000000..96c803f --- /dev/null +++ b/src/main/java/edu/algorithm/loadbalance/ConsistentHashLoadBalance.java @@ -0,0 +1,240 @@ +package edu.algorithm.loadbalance; + +import edu.algorithm.entity.Invocation; +import edu.algorithm.entity.RpcStatus; +import edu.algorithm.entity.Server; +import org.apache.dubbo.common.constants.CommonConstants; +import org.apache.dubbo.common.io.Bytes; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + + +public class ConsistentHashLoadBalance { + + + // 提供请求之间的隔离,相同参数的请求总是发到同一提供者 + private final ConcurrentMap> selectors = new ConcurrentHashMap(); + + public ConsistentHashLoadBalance() { + } + + + protected Server doSelect(List serverList,Invocation invocation) { + // 获取方法名 + String methodName = "edu.example.method"; + // key格式:接口名.方法名 + String key = serverList.get(0).getIp() + "." + methodName; +// String key = "test." + methodName; + // identityHashCode 用来识别invokers是否发生过变更 + int invokersHashCode = this.getCorrespondingHashCode(serverList); + // 根key,从缓存中获取 ConsistentHashSelector + ConsistentHashSelector selector = (ConsistentHashSelector)this.selectors.get(key); + // 若不存在"接口.方法名"对应的选择器,或是Invoker列表已经发生了变更,则初始化一个选择器。 + if (selector == null || selector.identityHashCode != invokersHashCode) { + // 若不存在"接口.方法名"对应的选择器,或是Invoker列表已经发生了变更,则初始化一个选择器 + this.selectors.put(key, new ConsistentHashSelector(serverList, methodName, invokersHashCode)); + selector = (ConsistentHashSelector)this.selectors.get(key); + } + // 通过选择器去选择一个 服务实例 + return selector.select(invocation); + } + + public int getCorrespondingHashCode(List serverList) { + return serverList.hashCode(); + } + + private static final class ConsistentHashSelector { + /** + * 存储Hash值与节点映射关系的TreeMap + */ + private final TreeMap virtualInvokers = new TreeMap(); + /** + * 节点数目 + */ + private final int replicaNumber = 160; + /** + * 用来识别Invoker列表是否发生变更的Hash码 + */ + private final int identityHashCode; + /** + * 请求中用来作Hash映射的参数的索引 + */ + private final int[] argumentIndex; + /** + * 服务器请求 变更的数量 + */ + private Map serverRequestCountMap = new ConcurrentHashMap(); + /** + * 总请求次数 + */ + private AtomicLong totalRequestCount; + /** + * 服务的数量 + */ + private int serverCount; + /** + * 过载因子 + */ + private static final double OVERLOAD_RATIO_THREAD = 1.5; + + ConsistentHashSelector(List serverList, String methodName, int identityHashCode) { + // hashcode + this.identityHashCode = identityHashCode; +// String url = serverList.get(0).getIp(); +// this.replicaNumber = serverList.get(0).getMethodParameter(methodName,"hash.nodes",160); + // 获取 hash.arguments ,缺省是 0 然后进行切割 + // 一致性Hash负载均衡涉及到两个主要的配置参数为hash.arguments 与hash.nodes。 + + // 缺省只对第一个参数Hash,如果要修改,请配置 + // hash.arguments : 当进行调用时候根据调用方法的哪几个参数生成key, + // 并根据key来通过一致性hash算法来选择调用结点。例如调用方法invoke(String s1,String s2); + // 若hash.arguments为1(默认值),则仅取invoke的参数1(s1)来生成hashCode + // 到底使用那个入参去 继续hash 计算 + String[] index = new String[]{"0"}; // 获取Hash映射参数的下标配置项,这里默认使用 0 + + this.argumentIndex = new int[index.length]; + + for(int i = 0; i < index.length; ++i) { + this.argumentIndex[i] = Integer.parseInt(index[i]); + } + + Iterator var14 = serverList.iterator(); + // 把所有的服务实例 通过计算 映射服务到哈希环上 + while(var14.hasNext()) { + Server server = (Server) var14.next(); + // 获取地址,这里根据IP 去计算 + String address = server.getIp() +":"+ server.getPort(); + + for(int i = 0; i < this.replicaNumber / 4; ++i) { + byte[] digest = Bytes.getMD5(address + i); + + for(int h = 0; h < 4; ++h) { + // 计算当前服务实例的位置 通过识别码 digest 与 4294967295L(2^32-1) 取模,这里的取模是通过位运算 + // 这里的hash 是通过高低位运算去掉高位,低位空位补零然后与 2^32-1 取模 得到落点,作用就是 把我们的服务器 映射到4个分段去 + long m = this.hash(digest, h); + // 得到落点 long 然后映射服务 + this.virtualInvokers.put(m, server); + } + } + } + + this.totalRequestCount = new AtomicLong(0L); + this.serverCount = serverList.size(); + this.serverRequestCountMap.clear(); + } + + public Server select(Invocation invocation) { + // 根据invocation的【参数值】来确定key,默认使用第一个参数来做hash计算 + String key = this.toKey(invocation.getArguments()); + // 获取【参数值】的md5编码 + byte[] digest = Bytes.getMD5(key); + return this.selectForKey(this.hash(digest, 0)); + } + // 根据参数索引获取参数,并将所有参数拼接成字符串 + private String toKey(Object[] args) { + // 这里就是根据 参数列表 拼装成一个字符串了 + StringBuilder buf = new StringBuilder(); + int[] var3 = this.argumentIndex; + int var4 = var3.length; + + for(int var5 = 0; var5 < var4; ++var5) { + int i = var3[var5]; + if (i >= 0 && i < args.length) { + buf.append(args[i]); + } + } + + return buf.toString(); + } + + // 根据参数字符串的md5编码找出Invoker + private Server selectForKey(long hash) { + // 获取跟当前 hash 相同的 Entry,如果不存在,就返回 大于 这个hash 的最小的一个条目,如果也不存在,就返回null + Map.Entry entry = this.virtualInvokers.ceilingEntry(hash); + // 如果没有 就去第一个,可以看成是一个环 + if (entry == null) { + entry = this.virtualInvokers.firstEntry(); + } + + // 这里获取 服务实例的地址 + String serverAddress = ((Server)entry.getValue()).getIp() +":"+ entry.getValue().getPort(); + // 最大请求数 除以 服务列表 再乘以 1.5 得到overloadThread + // 然后从访问列表里找,是否有这个当前服务实例的访问记录,如果有就判断它的访问次数 是否大于等于 overloadThread + // 目的是为了找到一个访问次数较少的一个服务器 + for(double overloadThread = (double)this.totalRequestCount.get() / + (double)this.serverCount * 1.5; this.serverRequestCountMap.containsKey(serverAddress) && + (double)((AtomicLong)this.serverRequestCountMap.get(serverAddress)).get() >= overloadThread; + serverAddress = ((Server)entry.getValue()).getIp() +":"+ entry.getValue().getPort()) { + + // 找到一个最近的,如果没有就开始新的一环 + entry = this.getNextInvokerNode(this.virtualInvokers, entry); + } + + // 保存访问次数 + if (!this.serverRequestCountMap.containsKey(serverAddress)) { + this.serverRequestCountMap.put(serverAddress, new AtomicLong(1L)); + } else { + ((AtomicLong)this.serverRequestCountMap.get(serverAddress)).incrementAndGet(); + } + + this.totalRequestCount.incrementAndGet(); + return entry.getValue(); + } + + private Map.Entry getNextInvokerNode(TreeMap virtualInvokers, Map.Entry entry) { + // 获取大于指定键值的项,如果不存在,就从第一个开始,这里的作用就是获取一个大于entry的最近的一个entry 如果不存在开启新的一个轮了 + Map.Entry nextEntry = virtualInvokers.higherEntry(entry.getKey()); + return nextEntry == null ? virtualInvokers.firstEntry() : nextEntry; + } + + private long hash(byte[] digest, int number) { + // 这串源码的作用是 把md5值去除高位 拆成4部分之后与 2的32次方取模 得到4个分布值 + // byte[] digest = {60,-15, -74, 11, 122, -68, -124, -79, 100, -110, -34, -27, -48, 87, 33, -60}; + // {60,-15, -74, 11} + // long a = 60 & 255 ; 60 + // long b = -15 & 255 << 8; 65250 + // long c = -74 & 255 << 16; 16711680 + // long d = 11 & 255 << 24; 0 + // System.out.println(a|b|c|d); a+b+c+d = 60 + 65250 + 16711680 + 0 =16777020 + // 然后 16777020 % 4294967295L + return ((long)(digest[3 + number * 4] & 255) << 24 | + (long)(digest[2 + number * 4] & 255) << 16 | + (long)(digest[1 + number * 4] & 255) << 8 | + (long)(digest[number * 4] & 255)) + & 4294967295L; + } + } + + + public static void main(String[] args) { + + List serverList = new ArrayList<>(); + serverList.add(new Server(1,"服务器1","8080","127.0.0.1",0,0,0,"")); + serverList.add(new Server(2,"服务器2","8090","127.0.0.2",0,0,0,"")); + serverList.add(new Server(3,"服务器3","8088","127.0.0.3",0,0,0,"")); + serverList.add(new Server(4,"服务器4","8099","127.0.0.4",0,0,0,"")); + serverList.add(new Server(5,"服务器5","8070","127.0.0.5",0,0,0,"")); + serverList.add(new Server(6,"服务器6","8060","127.0.0.6",0,0,0,"")); + serverList.add(new Server(7,"服务器7","8050","127.0.0.7",0,0,0,"")); + + + + ConsistentHashLoadBalance hashLoadBalance = new ConsistentHashLoadBalance(); + Invocation invocation = new Invocation(); + invocation.setArguments(new String[]{"loadbalance"}); + ConcurrentMap> selectors = null; + for (int i = 0; i < 10; i++) { + invocation = new Invocation(); + invocation.setArguments(new String[]{""+i}); + // 负载均衡策略的执行,即是在所有的Provider中选出一个,作为当前Consumer的远程调用对象 + System.out.println(hashLoadBalance.doSelect(serverList,invocation).toString()); + selectors = hashLoadBalance.selectors; + } + System.out.println(selectors.toString()); + + + } +} diff --git a/src/main/java/edu/algorithm/loadbalance/LeastActiveLoadBalance.java b/src/main/java/edu/algorithm/loadbalance/LeastActiveLoadBalance.java new file mode 100644 index 0000000..dc34eed --- /dev/null +++ b/src/main/java/edu/algorithm/loadbalance/LeastActiveLoadBalance.java @@ -0,0 +1,126 @@ +package edu.algorithm.loadbalance; + +import edu.algorithm.entity.RpcStatus; +import edu.algorithm.entity.Server; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ThreadLocalRandom; + +/** + * 加权最少活跃调用优先 + */ +public class LeastActiveLoadBalance { + + public static final String NAME = "leastactive"; +// +// private static final ConcurrentMap SERVICE_STATISTICS = new ConcurrentHashMap(); + // 要找出 这个个服务列表里,活跃度最少的一个服务器,并且选中,0 我们要根据权重去做加权随机, + public Server select(List serverList){ + // 权重、最少使用、响应时间 + int length = serverList.size(); + int leastActive = -1; + int leastCount = 0; + int[] leastIndexes = new int[length]; + int[] weights = new int[length]; + int totalWeight = 0; + int firstWeight = 0; + boolean sameWeight = true; + + int offsetWeight; + int leastIndex; + + // 第一步 找到当前 活跃度最少的服务,如果相同则通过 加权随机算法选择一个 + for (offsetWeight = 0;offsetWeight < length; ++offsetWeight) { + // 根据下标去拿服务 + Server server = serverList.get(offsetWeight); + // 获取当前服务的最少活跃度 + leastIndex = RpcStatus.getStatus(server.getIp()).getActive(); + // 获取当前服务的权重 + int afterWarmup = server.getWeight(); + // 把权重存在数组里,到时候用于动态判断 + weights[offsetWeight] = afterWarmup; + // 不是第一次访问 并且 当前服务实例大于等于 最近一次服务实例 连接数 + if(leastActive != -1 && leastIndex >= leastActive){ + // 当前服务实例的 最少活跃度 等于 最近服务实例的活跃度 + if(leastIndex == leastActive){ + // 保存每个服务实例的 最少活跃度 + leastIndexes[leastCount++] = offsetWeight; + // 计算总权重 + totalWeight += afterWarmup; + // 用于判断 + if(sameWeight && afterWarmup != firstWeight){ + sameWeight = false; + } + } + }else { + leastActive = leastIndex; + leastCount = 1; + leastIndexes[0] = offsetWeight; + totalWeight = afterWarmup; + firstWeight = afterWarmup; + sameWeight = true; + } + } + // 如果是只有一个服务,有一个最少活跃度的服务被选中, + if(leastCount == 1){ + return serverList.get(leastIndexes[0]); + }else { + // 权重不相同,并且 总权重大于0 + if (!sameWeight && totalWeight > 0) { + // 根据总权重去拿一个 随机数, 加权随机算法 + offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); + + for(int i = 0; i < leastCount; ++i) { + leastIndex = leastIndexes[i]; + offsetWeight -= weights[leastIndex]; + if (offsetWeight < 0) { + return serverList.get(leastIndex); + } + } + } + return serverList.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); + } + + } + + public static void main(String[] args) { + List serverList = new ArrayList<>(); + serverList.add(new Server(1,"服务器2","8080","127.0.0.1",90,100)); + serverList.add(new Server(2,"服务器1","8080","127.0.0.2",60,110)); + serverList.add(new Server(3,"服务器3","8080","127.0.0.3",50,120)); + serverList.add(new Server(4,"服务器4","8080","127.0.0.4",40,130)); + serverList.add(new Server(5,"服务器5","8080","127.0.0.5",30,140)); + serverList.add(new Server(6,"服务器6","8080","127.0.0.6",20,150)); + serverList.add(new Server(7,"服务器7","8080","127.0.0.7",10,160)); + + LeastActiveLoadBalance loadBalance = new LeastActiveLoadBalance(); + + for (int i = 0; i < 10; i++) { + Server server = loadBalance.select(serverList); + RpcStatus.beginCount(server.getIp()); + System.out.println("被选中的服务器:" + server.toString()); + } + Iterator iterator = RpcStatus.SERVICE_STATISTICS.entrySet().iterator(); + while (iterator.hasNext()){ + Map.Entry entry = (Map.Entry) iterator.next(); + System.out.println("Key: "+entry.getKey()+" Value: "+entry.getValue().toString() + " avg:" + entry.getValue().getAverageElapsed()); + } + +// for (int i = 0; i < 20; i++) { +// Server server = loadBalance.select(serverList); +// RpcStatus.beginCount(server.getIp()); +// RpcStatus.endCount(RpcStatus.getStatus(server.getIp()), server.getElapsed(),true); +// System.out.println(server.toString()); +// } +// +// iterator = RpcStatus.SERVICE_STATISTICS.entrySet().iterator(); +// while (iterator.hasNext()){ +// Map.Entry entry = (Map.Entry) iterator.next(); +// System.out.println("Key: "+entry.getKey()+" Value: "+entry.getValue().toString() + " avg:" + entry.getValue().getAverageElapsed()); +// } + + } + +} diff --git a/src/main/java/edu/algorithm/loadbalance/RandomLoadBalance.java b/src/main/java/edu/algorithm/loadbalance/RandomLoadBalance.java new file mode 100644 index 0000000..445fb68 --- /dev/null +++ b/src/main/java/edu/algorithm/loadbalance/RandomLoadBalance.java @@ -0,0 +1,73 @@ +package edu.algorithm.loadbalance; + +import edu.algorithm.entity.Server; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; + +public class RandomLoadBalance { + + + private boolean needWeightLoadBalance = false; + + protected int getWeight(Server server){ + int weight; + return Math.max(weight = server.getWeight() > 0 ? server.getWeight() : 100, 0); + } + + public Server select(List serverList){ + int length = serverList.size(); + if(!needWeightLoadBalance){ + return serverList.get(ThreadLocalRandom.current().nextInt(length)); + }else { + boolean sameWeight = true; + int[] weights = new int[length]; + int totalWeight = 0; + + int offset; + int i; + for(offset = 0; offset < length; ++offset) { + i = this.getWeight(serverList.get(offset)); + totalWeight += i; + weights[offset] = totalWeight; + // 目的是为了算出整个列表里的权重是存在差异的 + // 通过 总权重 是否等于 当前权重 * index 去校验是否存在差异 + if (sameWeight && totalWeight != i * (offset + 1)) { + sameWeight = false; + } + } + if(totalWeight > 0 && !sameWeight){ + // 计算权重综合sum,在1和sum 之间选择一个offset。 + offset = ThreadLocalRandom.current().nextInt(totalWeight); + // 遍历整个权重list,如果遇到大于 offset 就选择该项 + for(i = 0; i < length; ++i) { + if (offset < weights[i]) { + return serverList.get(i); + } + } + } + return serverList.get(ThreadLocalRandom.current().nextInt(length)); + } + } + + + public static void main(String[] args) throws InterruptedException { + List serverList = new ArrayList<>(); + serverList.add(new Server(1,"服务器1","8080","192.168.2.2",80)); + serverList.add(new Server(2,"服务器2","8080","192.168.2.5",30)); + serverList.add(new Server(3,"服务器3","8080","192.168.2.8",40)); + serverList.add(new Server(4,"服务器4","8080","192.168.3.2",10)); + serverList.add(new Server(5,"服务器5","8080","192.168.3.5",89)); + serverList.add(new Server(6,"服务器6","8080","192.168.3.8",60)); + + RandomLoadBalance loadBalance = new RandomLoadBalance(); + loadBalance.needWeightLoadBalance = true; + Server server = loadBalance.select(serverList); + System.out.println(server.toString()); + } + + + +} diff --git a/src/main/java/edu/algorithm/loadbalance/RoundRobinLoadBalance.java b/src/main/java/edu/algorithm/loadbalance/RoundRobinLoadBalance.java new file mode 100644 index 0000000..1d0e2a6 --- /dev/null +++ b/src/main/java/edu/algorithm/loadbalance/RoundRobinLoadBalance.java @@ -0,0 +1,122 @@ +package edu.algorithm.loadbalance; + +import edu.algorithm.entity.Server; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +public class RoundRobinLoadBalance { + + protected static class WeightedRoundRobin { + // 当前服务的默认权重 + private int weight; + // 当前服务的权重 + private AtomicLong current = new AtomicLong(0L); + private long lastUpdate; + protected WeightedRoundRobin() { + } + public int getWeight() { + return this.weight; + } + // 重新初始化权重 + public void setWeight(int weight) { + this.weight = weight; + this.current.set(0L); + } + // 累加服务的权重 + public long increaseCurrent() { + return this.current.addAndGet((long)this.weight); + } + // 当前服务的权重 减去 总权重 + public void sel(int total) { + this.current.addAndGet((long)(-1 * total)); + } + public long getLastUpdate() { + return this.lastUpdate; + } + public void setLastUpdate(long lastUpdate) { + this.lastUpdate = lastUpdate; + } + } + private ConcurrentMap> methodWeightMap = new ConcurrentHashMap(); + + + protected Server select(List serverList){ + String key = "save key" + "edu.algorithm.loadbalance.RoundRobinLoadBalance.select"; + ConcurrentMap map = (ConcurrentMap)this.methodWeightMap.computeIfAbsent(key, (k) -> { + return new ConcurrentHashMap(); + }); + + int totalWeight = 0; + long maxCurrent = Long.MIN_VALUE; + long now = System.currentTimeMillis(); + int weight; + Server selectedSever = null; + WeightedRoundRobin selectedWRR = null; + + for (Iterator var12= serverList.iterator(); var12.hasNext(); totalWeight += weight){ + // 选择当前的服务 + Server server = (Server) var12.next(); + // 拿到权重 + weight = server.getWeight(); + int finalWeight = weight; + // 把当前的服务封装成一个 WeightedRoundRobin 用于记录自身被选择的次数以及用于加权轮询选择 + WeightedRoundRobin weightedRoundRobin = (WeightedRoundRobin)map.computeIfAbsent(server.getIp(), (k) -> { + WeightedRoundRobin wrr = new WeightedRoundRobin(); + wrr.setWeight(finalWeight); + return wrr; + }); + + // 判断当前服务的权重如果不等于服务的权重,刷新权重 + if (weight != weightedRoundRobin.getWeight()) { + weightedRoundRobin.setWeight(weight); + } + // 增加当前权重,当前没有被使用就会一直累计 current += weight + long cur = weightedRoundRobin.increaseCurrent(); + weightedRoundRobin.setLastUpdate(now); + // 通过这个判断,我们可以筛选出这个集合里 权重最高的一个服务实例 + if (cur > maxCurrent) { + maxCurrent = cur; + selectedSever = server; + selectedWRR = weightedRoundRobin; + } + } + // 如果一个服务超过 6w 毫秒 (1分钟)没有被访问,1分钟=60乘以1000=60000毫秒。就移除 +// if (serverList.size() != map.size()) { +// map.entrySet().removeIf((item) -> { +// return now - ((WeightedRoundRobin)item.getValue()).getLastUpdate() > 60000L; +// }); +// } + + if(selectedSever != null){ + selectedWRR.sel(totalWeight); + return selectedSever; + }else { + return serverList.get(0); + } + + } + + public static void main(String[] args) throws Exception { + List serverList = new ArrayList<>(); + // 1.收集可用服务器列表 + serverList.add(new Server(1,"服务器1","8080","192.168.2.2",80)); + serverList.add(new Server(2,"服务器2","8080","192.168.2.5",30)); + serverList.add(new Server(3,"服务器3","8080","192.168.2.8",40)); + serverList.add(new Server(4,"服务器4","8080","192.168.3.2",20)); + serverList.add(new Server(5,"服务器5","8080","192.168.3.5",99)); + serverList.add(new Server(6,"服务器6","8080","192.168.3.8",60)); + + RoundRobinLoadBalance robinLoadBalance = new RoundRobinLoadBalance(); + + for (int i = 0; i < 10; i++) { + Server server = robinLoadBalance.select(serverList); + System.out.println(server.toString()); + } + + } +} diff --git a/src/main/java/edu/algorithm/loadbalance/ShortestResponseLoadBalance.java b/src/main/java/edu/algorithm/loadbalance/ShortestResponseLoadBalance.java new file mode 100644 index 0000000..5a79f81 --- /dev/null +++ b/src/main/java/edu/algorithm/loadbalance/ShortestResponseLoadBalance.java @@ -0,0 +1,170 @@ +package edu.algorithm.loadbalance; + +import edu.algorithm.entity.NamedThreadFactory; +import edu.algorithm.entity.RpcStatus; +import edu.algorithm.entity.Server; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ShortestResponseLoadBalance { + + public static final String NAME = "shortestresponse"; + private int slidePeriod = 30000; + private ConcurrentMap methodMap = new ConcurrentHashMap(); + private AtomicBoolean onResetSlideWindow = new AtomicBoolean(false); + private volatile long lastUpdateTime = System.currentTimeMillis(); + private ExecutorService executorService = Executors + .newCachedThreadPool(new NamedThreadFactory("Dubbo-framework-shared-handler", true)); + + + public static class SlideWindowData { + private long succeededOffset; + private long succeededElapsedOffset; + public RpcStatus rpcStatus; + + public SlideWindowData(RpcStatus rpcStatus) { + this.rpcStatus = rpcStatus; + this.succeededOffset = 0L; + this.succeededElapsedOffset = 0L; + } + + public void reset() { + this.succeededOffset = this.rpcStatus.getSucceeded(); + this.succeededElapsedOffset = this.rpcStatus.getSucceededElapsed(); + } + + private long getSucceededAverageElapsed() { + long succeed = this.rpcStatus.getSucceeded() - this.succeededOffset; + return succeed == 0L ? 0L : (this.rpcStatus.getSucceededElapsed() - this.succeededElapsedOffset) / succeed; + } + + public long getEstimateResponse() { + int active = this.rpcStatus.getActive() + 1; + return this.getSucceededAverageElapsed() * (long)active; + } + + public String toString(){ + + return "SlideWindowData{" + + "succeededOffset:" + this.succeededOffset + " " + + "succeededElapsedOffset:" + this.succeededElapsedOffset + " " + + "rpcStatus:" + this.rpcStatus.toString() + + "}"; + } + } + + public ShortestResponseLoadBalance() { + } + + + protected Server select(List serverList) { + // 获取服务列表的List + int length = serverList.size(); + long shortestResponse = Long.MAX_VALUE; + int shortestCount = 0; + // 最快响应时间 + int[] shortestIndexes = new int[length]; + // 权重 + int[] weights = new int[length]; + int totalWeight = 0; + int firstWeight = 0; + boolean sameWeight = true; + + int offsetWeight; + for(offsetWeight = 0; offsetWeight < length; ++offsetWeight) { + // 获取当前服务实例 + Server server = serverList.get(offsetWeight); + // 获取当前服务实例的状态数据 + RpcStatus rpcStatus = RpcStatus.getStatus(server.getIp()); + // 通过状态数据拿到 当前服务的 滑动窗口数据 + SlideWindowData slideWindowData = (SlideWindowData)this.methodMap.computeIfAbsent(server.getIp(), (key) -> { + return new SlideWindowData(rpcStatus); + }); + // 通过滑动窗口拿到 预估的响应时间 + long estimateResponse = slideWindowData.getEstimateResponse(); + // 获取当前服务的权重 + int afterWarmup = server.getWeight(); + // 把服务的权重保存起来 + weights[offsetWeight] = afterWarmup; + // 如果当前服务的 预估响应时间 小于 当前最短的响应时间 + if (estimateResponse < shortestResponse) { + // 更改赋值 + shortestResponse = estimateResponse; + shortestCount = 1; + shortestIndexes[0] = offsetWeight; + totalWeight = afterWarmup; + firstWeight = afterWarmup; + sameWeight = true; + } else if (estimateResponse == shortestResponse) { + // 如果预估的响应时间 和当前的相等, 添加访问次数,设置权重 + shortestIndexes[shortestCount++] = offsetWeight; + // 增加整体权重 + totalWeight += afterWarmup; + if (sameWeight && offsetWeight > 0 && afterWarmup != firstWeight) { + sameWeight = false; + } + } + } + // 30s 开启一个定时任务 去重置 滑动窗口 + if (System.currentTimeMillis() - this.lastUpdateTime > (long)this.slidePeriod && + this.onResetSlideWindow.compareAndSet(false, true)) { + this.executorService.execute(() -> { + this.methodMap.values().forEach(SlideWindowData::reset); + this.lastUpdateTime = System.currentTimeMillis(); + this.onResetSlideWindow.set(false); + }); + } + // 得出最短响应时间的服务器 + if (shortestCount == 1) { + return serverList.get(shortestIndexes[0]); + } else { + + if (!sameWeight && totalWeight > 0) { + offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); + for(int i = 0; i < shortestCount; ++i) { + int shortestIndex = shortestIndexes[i]; + offsetWeight -= weights[shortestIndex]; + if (offsetWeight < 0) { + return serverList.get(shortestIndex); + } + } + } + + return serverList.get(shortestIndexes[ThreadLocalRandom.current().nextInt(shortestCount)]); + } + } + + + public static void main(String[] args) { + List serverList = new ArrayList<>(); + serverList.add(new Server(1,"服务器1","8080","127.0.0.1",90,60)); + serverList.add(new Server(2,"服务器2","8080","127.0.0.2",80,80)); + serverList.add(new Server(3,"服务器3","8080","127.0.0.3",70,90)); + serverList.add(new Server(4,"服务器4","8080","127.0.0.4",60,60)); + serverList.add(new Server(5,"服务器5","8080","127.0.0.5",50,80)); + serverList.add(new Server(6,"服务器6","8080","127.0.0.6",40,60)); + serverList.add(new Server(7,"服务器7","8080","127.0.0.7",30,80)); + + ShortestResponseLoadBalance loadBalance = new ShortestResponseLoadBalance(); + for (int i = 0; i < 10; i++) { + Server server = loadBalance.select(serverList); + RpcStatus.beginCount(server.getIp()); + RpcStatus.endCount(RpcStatus.getStatus(server.getIp()), server.getElapsed(),true); + System.out.println("被选中的服务器:" + server.toString()); + } + + Iterator iterator = loadBalance.methodMap.entrySet().iterator(); + + while (iterator.hasNext()){ + Map.Entry entry = (Map.Entry) iterator.next(); + System.out.println("key" + entry.getKey() +",value :" + entry.getValue().toString()); + } + } + + +} diff --git a/src/main/java/edu/algorithm/untils/FastJsonUtil.java b/src/main/java/edu/algorithm/untils/FastJsonUtil.java new file mode 100644 index 0000000..c33871a --- /dev/null +++ b/src/main/java/edu/algorithm/untils/FastJsonUtil.java @@ -0,0 +1,11 @@ +package edu.algorithm.untils; + +import com.alibaba.fastjson.JSONObject; + +public class FastJsonUtil { + + + public static void main(String[] args) { + Object o = JSONObject.parseObject("{id:1}",Object.class); + } +}