From 2bc2ab864caa08624b3774d14d824c23397cbbee Mon Sep 17 00:00:00 2001 From: liningrui Date: Thu, 8 Apr 2021 17:59:47 +0800 Subject: [PATCH 1/4] Add a callback 'onBusy' used to adaptive rate limit Change-Id: I2a1139f6e436744ee6b20557ec18f2181b2e63be --- .../baidu/hugegraph/backend/store/raft/RaftNode.java | 7 +++++++ .../backend/store/raft/RaftSharedContext.java | 4 ++-- .../backend/store/raft/StoreStateMachine.java | 12 ++++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index 9a7b3ee978..7c64e1fef0 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -285,6 +285,13 @@ public void onError(PeerId peer, Status status) { } } + // NOTE: Jraft itself doesn't have this callback, it's added by us + // @Override + public void onBusy(PeerId peer, Status status) { + int count = RaftNode.this.busyCounter.incrementAndGet(); + LOG.info("Increase busy counter: [{}]", count); + } + private boolean isWriteBufferOverflow(Status status) { String expectMsg = "maybe write overflow"; return RaftError.EINTERNAL == status.getRaftError() && diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index e4c7f44385..56ee6c0548 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -25,9 +25,9 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.io.FileUtils; @@ -362,7 +362,7 @@ private ExecutorService createBackendExecutor(int threads) { private static ExecutorService newPool(int coreThreads, int maxThreads, String name, RejectedExecutionHandler handler) { - BlockingQueue workQueue = new LinkedBlockingQueue<>(); + BlockingQueue workQueue = new SynchronousQueue<>(); return ThreadPoolUtil.newBuilder() .poolName(name) .enableMetric(false) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 2b8981015c..07caa54fa9 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import org.slf4j.Logger; @@ -103,6 +104,7 @@ public void onApply(Iterator iter) { LOG.debug("Node role: {}", this.node().selfIsLeader() ? "leader" : "follower"); StoreClosure closure = null; + List> futures = new ArrayList<>(); try { while (iter.hasNext()) { closure = (StoreClosure) iter.done(); @@ -124,7 +126,7 @@ public void onApply(Iterator iter) { byte[] bytes = iter.getData().array(); // Follower seems no way to wait future // Let the backend thread do it directly - this.context.backendExecutor().submit(() -> { + futures.add(this.context.backendExecutor().submit(() -> { BytesBuffer buffer = LZ4Util.decompress(bytes, RaftSharedContext.BLOCK_SIZE); buffer.forReadWritten(); @@ -137,10 +139,14 @@ public void onApply(Iterator iter) { action, e); throw new BackendException("Backend error", e); } - }); + })); } iter.next(); } + // Follower wait tasks finished + for (Future future : futures) { + future.get(); + } } catch (Throwable e) { LOG.error("StateMachine occured critical error", e); Status status = new Status(RaftError.ESTATEMACHINE, @@ -150,6 +156,7 @@ public void onApply(Iterator iter) { closure.failure(status, e); } // Will cause current node inactive + // TODO: rollback to correct index iter.setErrorAndRollback(1L, status); } } @@ -253,6 +260,7 @@ public void onConfigurationCommitted(Configuration conf) { @Override public void onError(final RaftException e) { + // If busy, spin and wait a moment LOG.error("Raft error: {}", e.getMessage(), e); } } From 3b7837f917cc1d8031dbce7d99e9b23c4ee5b950 Mon Sep 17 00:00:00 2001 From: liningrui Date: Tue, 13 Apr 2021 15:16:37 +0800 Subject: [PATCH 2/4] tiny improve Change-Id: Ifc12c9b408ee24583e42fcd4842915ec2a71e554 --- .../java/com/baidu/hugegraph/backend/store/raft/RaftNode.java | 1 - .../baidu/hugegraph/backend/store/raft/StoreStateMachine.java | 1 - 2 files changed, 2 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index 7c64e1fef0..f390ce25c8 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -286,7 +286,6 @@ public void onError(PeerId peer, Status status) { } // NOTE: Jraft itself doesn't have this callback, it's added by us - // @Override public void onBusy(PeerId peer, Status status) { int count = RaftNode.this.busyCounter.incrementAndGet(); LOG.info("Increase busy counter: [{}]", count); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 07caa54fa9..c663f372ad 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -124,7 +124,6 @@ public void onApply(Iterator iter) { } else { // Follower need readMutation data byte[] bytes = iter.getData().array(); - // Follower seems no way to wait future // Let the backend thread do it directly futures.add(this.context.backendExecutor().submit(() -> { BytesBuffer buffer = LZ4Util.decompress(bytes, From 706c3168474e59e5505c06bfe8bd9fe0228a85f6 Mon Sep 17 00:00:00 2001 From: liningrui Date: Tue, 13 Apr 2021 15:44:55 +0800 Subject: [PATCH 3/4] improve some comment Change-Id: Ib74c3df54ed2f7cbbe27a76d6f344173792bf654 --- .../java/com/baidu/hugegraph/backend/store/raft/RaftNode.java | 4 ++++ .../baidu/hugegraph/backend/store/raft/RaftSharedContext.java | 4 +++- .../baidu/hugegraph/backend/store/raft/StoreStateMachine.java | 1 - 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index f390ce25c8..5fbbdf10d2 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -287,6 +287,10 @@ public void onError(PeerId peer, Status status) { // NOTE: Jraft itself doesn't have this callback, it's added by us public void onBusy(PeerId peer, Status status) { + /* + * If follower is busy then increase busy counter, + * it will lead to submit thread wait more time + */ int count = RaftNode.this.busyCounter.incrementAndGet(); LOG.info("Increase busy counter: [{}]", count); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index 56ee6c0548..7361de594f 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -73,6 +74,7 @@ public final class RaftSharedContext { public static final int WAIT_RPC_TIMEOUT = 30 * 60 * 1000; // compress block size public static final int BLOCK_SIZE = 8192; + public static final int QUEUE_SIZE = 10; public static final String DEFAULT_GROUP = "default"; @@ -362,7 +364,7 @@ private ExecutorService createBackendExecutor(int threads) { private static ExecutorService newPool(int coreThreads, int maxThreads, String name, RejectedExecutionHandler handler) { - BlockingQueue workQueue = new SynchronousQueue<>(); + BlockingQueue workQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); return ThreadPoolUtil.newBuilder() .poolName(name) .enableMetric(false) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index c663f372ad..e450a68979 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -259,7 +259,6 @@ public void onConfigurationCommitted(Configuration conf) { @Override public void onError(final RaftException e) { - // If busy, spin and wait a moment LOG.error("Raft error: {}", e.getMessage(), e); } } From eed2800050528b76f110057011b92916c469a395 Mon Sep 17 00:00:00 2001 From: liningrui Date: Tue, 13 Apr 2021 20:01:51 +0800 Subject: [PATCH 4/4] tiny improve Change-Id: I2a6a8b39f7066008483fde2c8fb66fa7db32ca0e --- .../com/baidu/hugegraph/auth/ContextGremlinServer.java | 3 ++- .../java/com/baidu/hugegraph/config/ServerOptions.java | 4 ++-- .../baidu/hugegraph/license/LicenseVerifyManager.java | 3 ++- .../backend/store/raft/RaftSharedContext.java | 4 ++-- .../java/com/baidu/hugegraph/config/CoreOptions.java | 4 ++-- .../main/java/com/baidu/hugegraph/util/Consumers.java | 10 +++++----- .../backend/store/rocksdb/RocksDBStdSessions.java | 2 +- 7 files changed, 16 insertions(+), 14 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java index 1d62def94c..271d599343 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java @@ -33,6 +33,7 @@ import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.auth.HugeGraphAuthProxy.Context; import com.baidu.hugegraph.auth.HugeGraphAuthProxy.ContextThreadPoolExecutor; +import com.baidu.hugegraph.config.CoreOptions; /** * GremlinServer with custom ServerGremlinExecutor, which can pass Context @@ -76,7 +77,7 @@ public void injectTraversalSource(String prefix) { static ExecutorService newGremlinExecutorService(Settings settings) { if (settings.gremlinPool == 0) { - settings.gremlinPool = Runtime.getRuntime().availableProcessors(); + settings.gremlinPool = CoreOptions.CPUS; } int size = settings.gremlinPool; ThreadFactory factory = ThreadFactoryUtil.create("exec-%d"); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index 8b41ea3345..b15b0030ca 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -71,7 +71,7 @@ public static synchronized ServerOptions instance() { "restserver.max_worker_threads", "The maxmium worker threads of rest server.", rangeInt(2, Integer.MAX_VALUE), - 2 * Runtime.getRuntime().availableProcessors() + 2 * CoreOptions.CPUS ); public static final ConfigOption MIN_FREE_MEMORY = @@ -132,7 +132,7 @@ public static synchronized ServerOptions instance() { "gremlinserver.max_route", "The max route number for gremlin server.", positiveInt(), - 2 * Runtime.getRuntime().availableProcessors() + 2 * CoreOptions.CPUS ); public static final ConfigListOption GRAPHS = diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java index 5e0d9e67a5..881abfebda 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.config.ServerOptions; import com.baidu.hugegraph.core.GraphManager; @@ -218,7 +219,7 @@ private void checkCpu(ExtraParam param) { if (expectCpus == NO_LIMIT) { return; } - int actualCpus = Runtime.getRuntime().availableProcessors(); + int actualCpus = CoreOptions.CPUS; if (actualCpus > expectCpus) { throw newLicenseException( "The server's cpus '%s' exceeded the limit '%s'", diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index 7361de594f..92fa81a9e3 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -26,9 +26,9 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.io.FileUtils; @@ -74,7 +74,7 @@ public final class RaftSharedContext { public static final int WAIT_RPC_TIMEOUT = 30 * 60 * 1000; // compress block size public static final int BLOCK_SIZE = 8192; - public static final int QUEUE_SIZE = 10; + public static final int QUEUE_SIZE = CoreOptions.CPUS; public static final String DEFAULT_GROUP = "default"; diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java index 9d87993c08..24fcf606ae 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java @@ -30,14 +30,14 @@ public class CoreOptions extends OptionHolder { + public static final int CPUS = Runtime.getRuntime().availableProcessors(); + private CoreOptions() { super(); } private static volatile CoreOptions instance; - private static final int CPUS = Runtime.getRuntime().availableProcessors(); - public static synchronized CoreOptions instance() { if (instance == null) { instance = new CoreOptions(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java index 39d43a5627..69e96ec43c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java @@ -37,12 +37,12 @@ import org.slf4j.Logger; import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.task.TaskManager.ContextCallable; public final class Consumers { - public static final int CPUS = Runtime.getRuntime().availableProcessors(); - public static final int THREADS = 4 + CPUS / 4; + public static final int THREADS = 4 + CoreOptions.CPUS / 4; public static final int QUEUE_WORKER_SIZE = 1000; public static final long CONSUMER_WAKE_PERIOD = 1; @@ -240,8 +240,8 @@ public static ExecutorService newThreadPool(String prefix, int workers) { if (workers < 0) { assert workers == -1; workers = Consumers.THREADS; - } else if (workers > Consumers.CPUS * 2) { - workers = Consumers.CPUS * 2; + } else if (workers > CoreOptions.CPUS * 2) { + workers = CoreOptions.CPUS * 2; } String name = prefix + "-worker-%d"; return ExecutorUtil.newFixedThreadPool(workers, name); @@ -262,7 +262,7 @@ public static RuntimeException wrapException(Throwable e) { public static class ExecutorPool { - private final static int POOL_CAPACITY = 2 * CPUS; + private final static int POOL_CAPACITY = 2 * CoreOptions.CPUS; private final String threadNamePrefix; private final int executorWorkers; diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index 3e98cda349..c8dbfbcbcb 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -493,7 +493,7 @@ public static void initOptions(HugeConfig conf, // Optimize RocksDB if (optimize) { - int processors = Runtime.getRuntime().availableProcessors(); + int processors = CoreOptions.CPUS; db.setIncreaseParallelism(Math.max(processors / 2, 1)); db.setAllowConcurrentMemtableWrite(true);