Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>algorithm-course</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<source.level>1.8</source.level>
<target.level>1.8</target.level>
<dubbo.version>3.2.0</dubbo.version>
<junit.version>4.12</junit.version>
<spring.version>5.2.8.RELEASE</spring.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>

<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-dependencies-zookeeper</artifactId>
<type>pom</type>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>


<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-framework-bom</artifactId>
<version>${spring.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-bom</artifactId>
<version>${dubbo.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-dependencies-zookeeper</artifactId>
<version>${dubbo.version}</version>
<type>pom</type>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

</project>
38 changes: 38 additions & 0 deletions src/main/java/edu/algorithm/base/RoundRobinLoadBalancer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package edu.algorithm.base;

import java.util.ArrayList;
import java.util.List;

public class RoundRobinLoadBalancer {
private List<String> serverList;
private int currentIndex;

public RoundRobinLoadBalancer(List<String> serverList) {
this.serverList = serverList;
this.currentIndex = 0;
}

public String getNextServer() {
String server = serverList.get(currentIndex);
currentIndex = (currentIndex + 1) % serverList.size();
return server;
}

// 示例使用
public static void main(String[] args) {
// 创建服务器列表
List<String> serverList = new ArrayList<>();
serverList.add("Server1");
serverList.add("Server2");
serverList.add("Server3");

// 创建轮询负载均衡器实例
RoundRobinLoadBalancer loadBalancer = new RoundRobinLoadBalancer(serverList);

// 模拟请求分发
for (int i = 0; i < 10; i++) {
String server = loadBalancer.getNextServer();
System.out.println("Request " + (i + 1) + " sent to server: " + server);
}
}
}
110 changes: 110 additions & 0 deletions src/main/java/edu/algorithm/entity/AdaptiveMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package edu.algorithm.entity;

import org.apache.dubbo.common.utils.StringUtils;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

public class AdaptiveMetrics {

public final ConcurrentMap<String, AdaptiveMetrics> metricsStatistics = new ConcurrentHashMap<>();
//
public long currentProviderTime = 0;
// 是在 ProfilerServerFilter 的 onResponse 方法中经过计算得到的 cpu load 依赖其他组件计算出来的 CPU负载
public double providerCPULoad = 0;
// 是在 setProviderMetrics 方法里面维护的,其中 lastLatency 是在 ProfilerServerFilter 的 onResponse 方法中经过计算得到的 rt 值
// /'leɪtənsɪ/
public long lastLatency = 0;
// 当前时间
public long currentTime = 0;

//Allow some time disorder
public long pickTime = System.currentTimeMillis();

public double beta = 0.5;
// 是总的调用次数
public final AtomicLong consumerReq = new AtomicLong();
// 是在每次调用成功后在 AdaptiveLoadBalanceFilter 的 onResponse 方法中维护的值。
public final AtomicLong consumerSuccess = new AtomicLong();
// 每次出现调用异常时,维护的值
public final AtomicLong errorReq = new AtomicLong();
// 这是一个公式算出来的值 Vt = β * Vt-1 + (1 - β ) * θt
// 指数加权移动平均值的控制图 (exponentially weighted moving average) , EWMA主要用于对网络的状态参数进行估计和平滑 , 负责得到 当前服务器的“平滑负载指标”
// 有两个特点,1.不需要保存过去所有数值,2.计算量显著减少
public double ewma = 0;

public double getLoad(String idKey, int weight, int timeout) {
AdaptiveMetrics metrics = getStatus(idKey);

//If the time more than 2 times, mandatory selected 如果超时时间超过 2次,则强制选择
if (System.currentTimeMillis() - metrics.pickTime > timeout * 2) {
return 0;
}

if (metrics.currentTime > 0) {
long multiple = (System.currentTimeMillis() - metrics.currentTime) / timeout + 1;
if (multiple > 0) {
if (metrics.currentProviderTime == metrics.currentTime) {
//penalty value
metrics.lastLatency = timeout * 2L;
} else {
metrics.lastLatency = metrics.lastLatency >> multiple;
}
metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency;
metrics.currentTime = System.currentTimeMillis();
}
}

long inflight = metrics.consumerReq.get() - metrics.consumerSuccess.get() - metrics.errorReq.get();
// Vt = β * Vt-1 + (1 - β ) * θt
// 服务器的CPU负载 乘以 ( (响应时间) * 当前正在处理的请求数 + 1) / ( 请求成功次数 ) / (( 请求次数 +1)*权重 +1)
return metrics.providerCPULoad * (Math.sqrt(metrics.ewma) + 1) * (inflight + 1) / ((((double) metrics.consumerSuccess.get() / (double) (metrics.consumerReq.get() + 1)) * weight) + 1);
}

public AdaptiveMetrics getStatus(String idKey) {
return ConcurrentHashMapUtils.computeIfAbsent(metricsStatistics, idKey, k -> new AdaptiveMetrics());
}

public void addConsumerReq(String idKey) {
AdaptiveMetrics metrics = getStatus(idKey);
metrics.consumerReq.incrementAndGet();
}

public void addConsumerSuccess(String idKey) {
AdaptiveMetrics metrics = getStatus(idKey);
metrics.consumerSuccess.incrementAndGet();
}

public void addErrorReq(String idKey) {
AdaptiveMetrics metrics = getStatus(idKey);
metrics.errorReq.incrementAndGet();
}

public void setPickTime(String idKey, long time) {
AdaptiveMetrics metrics = getStatus(idKey);
metrics.pickTime = time;
}


public void setProviderMetrics(String idKey, Map<String, String> metricsMap) {

AdaptiveMetrics metrics = getStatus(idKey);

long serviceTime = Long.parseLong(Optional.ofNullable(metricsMap.get("curTime")).filter(v -> StringUtils.isNumeric(v, false)).orElse("0"));
//If server time is less than the current time, discard
if (metrics.currentProviderTime > serviceTime) {
return;
}

metrics.currentProviderTime = serviceTime;
metrics.currentTime = serviceTime;
metrics.providerCPULoad = Double.parseDouble(Optional.ofNullable(metricsMap.get("load")).filter(v -> StringUtils.isNumeric(v, true)).orElse("0"));
metrics.lastLatency = Long.parseLong((Optional.ofNullable(metricsMap.get("rt")).filter(v -> StringUtils.isNumeric(v, false)).orElse("0")));

metrics.beta = 0.5;
metrics.ewma = metrics.beta * metrics.ewma + (1 - metrics.beta) * metrics.lastLatency;
}
}
42 changes: 42 additions & 0 deletions src/main/java/edu/algorithm/entity/ConcurrentHashMapUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package edu.algorithm.entity;

import org.junit.jupiter.api.condition.JRE;

import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
public class ConcurrentHashMapUtils {

/**
* A temporary workaround for Java 8 ConcurrentHashMap#computeIfAbsent specific performance issue: JDK-8161372.</br>
* @see <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">https://bugs.openjdk.java.net/browse/JDK-8161372</a>
*
*/
public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Function<? super K, ? extends V> func) {
Objects.requireNonNull(func);
if (JRE.JAVA_8.isCurrentVersion()) {
V v = map.get(key);
if (null == v) {
// issue#11986 lock bug
// v = map.computeIfAbsent(key, func);

// this bug fix methods maybe cause `func.apply` multiple calls.
v = func.apply(key);
if(null == v){
return null;
}
final V res = map.putIfAbsent(key, v);
if(null != res){
// if pre value present, means other thread put value already, and putIfAbsent not effect
// return exist value
return res;
}
// if pre value is null, means putIfAbsent effected, return current value
}
return v;
} else {
return map.computeIfAbsent(key, func);
}
}

}
39 changes: 39 additions & 0 deletions src/main/java/edu/algorithm/entity/Invocation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package edu.algorithm.entity;


import lombok.Data;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Data
public class Invocation {

private static final long serialVersionUID = -4355285085441097045L;
private String methodName;
private Class<?>[] parameterTypes;
private Object[] arguments;
private Map<String, Object> attachments;

private transient Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<>());

public void setAttachment(String key, Object value) {
setObjectAttachment(key, value);
}

public Map<Object, Object> getAttributes() {
return attributes;
}
public void setObjectAttachment(String key, Object value) {
try {

if (attachments == null) {
attachments = new HashMap<>();
}
attachments.put(key, value);
} finally {

}
}
}
39 changes: 39 additions & 0 deletions src/main/java/edu/algorithm/entity/NamedThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package edu.algorithm.entity;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {
protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
protected final AtomicInteger mThreadNum;
protected final String mPrefix;
protected final boolean mDaemon;
protected final ThreadGroup mGroup;

public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}

public NamedThreadFactory(String prefix) {
this(prefix, false);
}

public NamedThreadFactory(String prefix, boolean daemon) {
this.mThreadNum = new AtomicInteger(1);
this.mPrefix = prefix + "-thread-";
this.mDaemon = daemon;
SecurityManager s = System.getSecurityManager();
this.mGroup = s == null ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}

public Thread newThread(Runnable runnable) {
String name = this.mPrefix + this.mThreadNum.getAndIncrement();
Thread ret = new Thread(this.mGroup, runnable, name, 0L);
ret.setDaemon(this.mDaemon);
return ret;
}

public ThreadGroup getThreadGroup() {
return this.mGroup;
}
}
Loading