From 7078bd3360684b9c6d85bc9ef80bd2c4a5f254bf Mon Sep 17 00:00:00 2001 From: Alexander Kalankhodzhaev Date: Wed, 9 Feb 2022 18:49:51 +0300 Subject: [PATCH 1/3] WsProvider refactoring --- .../transport/ProviderInterface.java | 22 +++ .../transport/ws/WsProvider.java | 182 +++++++++++------- .../transport/ws/WsProviderProxyTest.java | 12 +- .../transport/ws/WsProviderTest.java | 23 +-- 4 files changed, 143 insertions(+), 96 deletions(-) diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java index 921eee4b..420ef299 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java @@ -41,6 +41,18 @@ public interface ProviderInterface { */ Runnable on(ProviderInterfaceEmitted type, EventListener sub); + /** + * Send data to the node + * + * @param method The RPC methods to execute + * @param params Encoded parameters as applicable for the method + * @param isCacheable Request can be cached + * @return future containing result + */ + CompletableFuture send(String method, + List params, + boolean isCacheable); + /** * Send data to the node * @@ -51,6 +63,16 @@ public interface ProviderInterface { CompletableFuture send(String method, // TODO replace `Object` to something like `JObject` to have more strict contract List params); + /** + * Send data to the node + * + * @param method The RPC methods to execute + * @param isCacheable Request can be cached + * @return future containing result + */ + CompletableFuture send(String method, + boolean isCacheable); + /** * Send data to the node * diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java index a38b349e..d1bccb01 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java @@ -2,6 +2,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.cache.CacheBuilder; import com.strategyobject.substrateclient.common.eventemitter.EventEmitter; import com.strategyobject.substrateclient.common.eventemitter.EventListener; import com.strategyobject.substrateclient.transport.ProviderInterface; @@ -12,16 +13,14 @@ import com.strategyobject.substrateclient.transport.coder.JsonRpcResponseSubscription; import com.strategyobject.substrateclient.transport.coder.RpcCoder; import lombok.*; +import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.framing.CloseFrame; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.net.URI; import java.net.URISyntaxException; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @Getter @@ -40,9 +39,9 @@ public WsStateSubscription(BiConsumer callBack, } } -@AllArgsConstructor @Getter @Setter +@AllArgsConstructor class WsStateAwaiting { private CompletableFuture callback; private String method; @@ -50,19 +49,21 @@ class WsStateAwaiting { private SubscriptionHandler subscription; } +@Slf4j public class WsProvider implements ProviderInterface, AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(WsProvider.class); + private static final int CALL_CACHE_CAPACITY = 384; private static final int RESUBSCRIBE_TIMEOUT = 20; private static final Map ALIASES = new HashMap<>(); - private static final ScheduledExecutorService timedOutHandlerCleaner = Executors - .newScheduledThreadPool(1); + private static final ScheduledExecutorService timedOutHandlerCleaner; static { + timedOutHandlerCleaner = Executors.newScheduledThreadPool(1); ALIASES.put("chain_finalisedHead", "chain_finalizedHead"); ALIASES.put("chain_subscribeFinalisedHeads", "chain_subscribeFinalizedHeads"); ALIASES.put("chain_unsubscribeFinalisedHeads", "chain_unsubscribeFinalizedHeads"); } + private final Map> callCache; private final RpcCoder coder = new RpcCoder(); private final URI endpoint; private final Map headers; @@ -71,9 +72,10 @@ public class WsProvider implements ProviderInterface, AutoCloseable { private final Map subscriptions = new ConcurrentHashMap<>(); private final Map waitingForId = new ConcurrentHashMap<>(); private final int heartbeatInterval; - private final AtomicReference webSocket = new AtomicReference<>(null); private final long responseTimeoutInMs; private int autoConnectMs; + private volatile WebSocketClient webSocket = null; + private volatile CompletableFuture whenConnected = null; private volatile boolean isConnected = false; WsProvider(@NonNull URI endpoint, @@ -93,10 +95,10 @@ public class WsProvider implements ProviderInterface, AutoCloseable { this.headers = headers; this.heartbeatInterval = heartbeatInterval; this.responseTimeoutInMs = responseTimeoutInMs; - - if (autoConnectMs > 0) { - this.connect(); - } + this.callCache = CacheBuilder.newBuilder() + .maximumSize(CALL_CACHE_CAPACITY) + .>build() + .asMap(); } public static Builder builder() { @@ -128,31 +130,34 @@ public boolean isConnected() { *

The {@link com.strategyobject.substrateclient.transport.ws.WsProvider} connects automatically by default, * however if you decided otherwise, you may connect manually using this method. */ - public CompletableFuture connect() { + public synchronized CompletableFuture connect() { + var alreadyConnected = this.whenConnected; + if (alreadyConnected != null) { + return alreadyConnected; + } + val whenConnected = new CompletableFuture(); try { - Preconditions.checkState( - this.webSocket.compareAndSet( - null, - WebSocket.builder() - .setServerUri(this.endpoint) - .setHttpHeaders(this.headers) - .onClose(this::onSocketClose) - .onError(this::onSocketError) - .onMessage(this::onSocketMessage) - .onOpen(this::onSocketOpen) - .build())); - - val webSocket = this.webSocket.get(); - webSocket.setConnectionLostTimeout(this.heartbeatInterval); - + val ws = WebSocket.builder() + .setServerUri(this.endpoint) + .setHttpHeaders(this.headers) + .onClose(this::onSocketClose) + .onError(this::onSocketError) + .onMessage(this::onSocketMessage) + .onOpen(this::onSocketOpen) + .build(); + ws.setConnectionLostTimeout(this.heartbeatInterval); + + this.webSocket = ws; + this.whenConnected = whenConnected; this.eventEmitter.once(ProviderInterfaceEmitted.CONNECTED, _x -> whenConnected.complete(null)); - webSocket.connect(); + ws.connect(); } catch (Exception ex) { - logger.error("Connect error", ex); - this.emit(ProviderInterfaceEmitted.ERROR, ex); + log.error("Connect error", ex); whenConnected.completeExceptionally(ex); + this.whenConnected = null; + this.emit(ProviderInterfaceEmitted.ERROR, ex); } return whenConnected; @@ -167,19 +172,9 @@ public void disconnect() { // switch off autoConnect, we are in manual mode now this.autoConnectMs = 0; - try { - this.webSocket.updateAndGet(ws -> { - if (ws != null) { - ws.close(CloseFrame.NORMAL); - } - - return null; - }); - - } catch (Exception ex) { - logger.error("Error disconnecting", ex); - this.emit(ProviderInterfaceEmitted.ERROR, ex); - throw ex; + val ws = this.webSocket; + if (ws != null) { + ws.close(CloseFrame.NORMAL); } } @@ -197,31 +192,60 @@ public Runnable on(ProviderInterfaceEmitted type, EventListener sub) { return () -> this.eventEmitter.removeListener(type, sub); } + @SuppressWarnings("unchecked") private CompletableFuture send(String method, List params, + boolean isCacheable, SubscriptionHandler subscription) { + val jsonRpcRequest = this.coder.encodeObject(method, params); + val json = RpcCoder.encodeJson(jsonRpcRequest); + + var result = isCacheable ? (CompletableFuture) this.callCache.get(json) : null; + if (result != null && !result.isCompletedExceptionally()) { + log.debug("Cached result for {}", json); + return result; + } + + val ws = this.webSocket; Preconditions.checkState( - this.webSocket.get() != null && this.isConnected, + ws != null && this.isConnected, "WebSocket is not connected"); - val jsonRpcRequest = this.coder.encodeObject(method, params); - val json = RpcCoder.encodeJson(jsonRpcRequest); val id = jsonRpcRequest.getId(); - - logger.debug("Calling {} {}, {}, {}, {}", id, method, params, json, subscription); + log.debug("Calling {} {}, {}, {}, {}", id, method, params, json, subscription); val whenResponseReceived = new CompletableFuture(); this.handlers.put(id, new WsStateAwaiting<>(whenResponseReceived, method, params, subscription)); - return CompletableFuture.runAsync(() -> this.webSocket.get().send(json)) + result = CompletableFuture.runAsync(() -> ws.send(json)) .whenCompleteAsync((_res, ex) -> { if (ex != null) { this.handlers.remove(id); + whenResponseReceived.completeExceptionally(ex); } else { scheduleCleanupIfNoResponseWithinTimeout(id); } }) .thenCombineAsync(whenResponseReceived, (_a, b) -> b); + + if (isCacheable) { + callCache.put(json, result); + } + + return result; + } + + /** + * Send JSON data using WebSockets to configured endpoint + * + * @param method The RPC methods to execute + * @param params Encoded parameters as applicable for the method + * @param isCacheable Request can be cached + * @return future containing result + */ + @Override + public CompletableFuture send(String method, List params, boolean isCacheable) { + return send(method, params, isCacheable, null); } /** @@ -233,7 +257,19 @@ private CompletableFuture send(String method, */ @Override public CompletableFuture send(String method, List params) { - return send(method, params, null); + return send(method, params, false, null); + } + + /** + * Send JSON data using WebSockets to configured endpoint + * + * @param method The RPC methods to execute + * @param isCacheable Request can be cached + * @return future containing result + */ + @Override + public CompletableFuture send(String method, boolean isCacheable) { + return send(method, null, false, null); } /** @@ -244,7 +280,7 @@ public CompletableFuture send(String method, List params) { */ @Override public CompletableFuture send(String method) { - return send(method, null, null); + return send(method, null, false, null); } /** @@ -260,7 +296,7 @@ public CompletableFuture subscribe(String type, String method, List params, BiConsumer callback) { - return this.send(method, params, new SubscriptionHandler(callback, type)); + return this.send(method, params, false, new SubscriptionHandler(callback, type)); } /** @@ -281,13 +317,13 @@ public CompletableFuture unsubscribe(String type, String method, String // a slight complication in solving - since we cannot rely on the sent id, but rather // need to find the actual subscription id to map it if (this.subscriptions.get(subscription) == null) { - logger.info("Unable to find active subscription={}", subscription); + log.info("Unable to find active subscription={}", subscription); whenUnsubscribed.complete(false); } else { this.subscriptions.remove(subscription); - if (this.isConnected() && this.webSocket.get() != null) { - return this.send(method, Collections.singletonList(id), null); + if (this.isConnected() && this.webSocket != null) { + return this.send(method, Collections.singletonList(id), false, null); } whenUnsubscribed.complete(true); @@ -322,39 +358,41 @@ private void onSocketClose(int code, String reason) { reason = ErrorCodes.getWSErrorString(code); } + val ws = this.webSocket; val errorMessage = String.format( "Disconnected from %s code: '%s' reason: '%s'", - this.webSocket.get() == null ? this.endpoint : this.webSocket.get().getURI(), + ws == null ? this.endpoint : ws.getURI(), code, reason); if (this.autoConnectMs > 0) { - logger.error(errorMessage); + log.error(errorMessage); } - this.isConnected = false; - this.webSocket.updateAndGet(_ws -> null); - this.emit(ProviderInterfaceEmitted.DISCONNECTED); - // reject all hanging requests val wsClosedException = new WsClosedException(errorMessage); this.handlers.values().forEach(x -> x.getCallback().completeExceptionally(wsClosedException)); this.handlers.clear(); this.waitingForId.clear(); + this.isConnected = false; + this.whenConnected = null; + this.webSocket = null; + this.emit(ProviderInterfaceEmitted.DISCONNECTED); + if (this.autoConnectMs > 0) { - logger.info("Trying to reconnect to {}", this.endpoint); + log.info("Trying to reconnect to {}", this.endpoint); this.connect(); } } private void onSocketError(Exception ex) { - logger.error("WebSocket error", ex); + log.error("WebSocket error", ex); this.emit(ProviderInterfaceEmitted.ERROR, ex); } private void onSocketMessage(String message) { - logger.debug("Received {}", message); + log.debug("Received {}", message); JsonRpcResponse response = RpcCoder.decodeJson(message); if (Strings.isNullOrEmpty(response.getMethod())) { @@ -369,7 +407,7 @@ private void onSocketMessageResult(JsonRpcResponseSingle response) { val id = response.getId(); val handler = (WsStateAwaiting) this.handlers.get(id); if (handler == null) { - logger.error("Unable to find handler for id={}", id); + log.error("Unable to find handler for id={}", id); return; } @@ -407,13 +445,13 @@ private void onSocketMessageSubscribe(JsonRpcResponseSubscription response) { val method = ALIASES.getOrDefault(response.getMethod(), response.getMethod()); val subId = method + "::" + response.getParams().getSubscription(); - logger.debug("Handling: response =', {}, 'subscription =', {}", response, subId); + log.debug("Handling: response =', {}, 'subscription =', {}", response, subId); val handler = this.subscriptions.get(subId); if (handler == null) { // store the JSON, we could have out-of-order subid coming in this.waitingForId.put(subId, response); - logger.info("Unable to find handler for subscription={}", subId); + log.info("Unable to find handler for subscription={}", subId); return; } @@ -429,7 +467,7 @@ private void onSocketMessageSubscribe(JsonRpcResponseSubscription response) { } public void onSocketOpen() { - logger.info("Connected to: {}", this.webSocket.get().getURI()); + log.info("Connected to: {}", this.webSocket.getURI()); this.isConnected = true; this.emit(ProviderInterfaceEmitted.CONNECTED); @@ -462,7 +500,7 @@ private void resubscribe() { subscription.getParams(), subscription.getCallBack()); } catch (Exception ex) { - logger.error("Resubscribe error {}", subscription, ex); + log.error("Resubscribe error {}", subscription, ex); return null; } }) @@ -470,7 +508,7 @@ private void resubscribe() { .toArray(CompletableFuture[]::new) ).get(RESUBSCRIBE_TIMEOUT, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException ex) { - logger.error("Resubscribe error", ex); + log.error("Resubscribe error", ex); } } @@ -478,7 +516,7 @@ public static class Builder { private URI endpoint; private int autoConnectMs = 2500; private Map headers = null; - private int heartbeatInterval = 60; + private int heartbeatInterval = 30; private long responseTimeoutInMs = 20000; Builder() { diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java index c95fe73c..c4582b3e 100644 --- a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderProxyTest.java @@ -32,19 +32,19 @@ public class WsProviderProxyTest { .withNetwork(network) .withNetworkAliases("toxiproxy"); private static final int HEARTBEAT_INTERVAL = 5; - private static final int WAIT_TIMEOUT = HEARTBEAT_INTERVAL * 2; + private static final int WAIT_TIMEOUT = HEARTBEAT_INTERVAL * 3; final ToxiproxyContainer.ContainerProxy proxy = toxiproxy.getProxy(substrate, 9944); @Test + @SneakyThrows void canReconnect() { try (val wsProvider = WsProvider.builder() .setEndpoint(getWsAddress()) .setHeartbeatsInterval(HEARTBEAT_INTERVAL) .build()) { - await() - .atMost(WAIT_TIMEOUT, TimeUnit.SECONDS) - .until(wsProvider::isConnected); + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); proxy.setConnectionCut(true); await() @@ -68,7 +68,9 @@ void canAutoConnectWhenServerAvailable() { .disableHeartbeats() .build()) { - Thread.sleep(WAIT_TIMEOUT * 1000); + assertThrows( + TimeoutException.class, + () -> wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS)); assertFalse(wsProvider.isConnected()); proxy.setConnectionCut(false); diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java index 5ed7f217..c6544370 100644 --- a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java @@ -38,29 +38,14 @@ void canConnect() { } @Test - void connectFailsWhenConnected() { + void connectReturnsSameFutureWhenCalledMultiple() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) .build()) { + val connectA = wsProvider.connect(); + val connectB = wsProvider.connect(); - val executionException = assertThrows( - ExecutionException.class, - () -> wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS)); - assertTrue(executionException.getCause() instanceof IllegalStateException); - } - } - - @Test - void canAutoConnect() { - try (val wsProvider = WsProvider.builder() - .setEndpoint(substrate.getWsAddress()) - .setAutoConnectDelay(5000) - .build()) { - - assertDoesNotThrow( - () -> await() - .atMost(WAIT_TIMEOUT, TimeUnit.SECONDS) - .until(wsProvider::isConnected)); + assertEquals(connectA, connectB); } } From 288740fa1c9cd7458308a8644b8b65157613f605 Mon Sep 17 00:00:00 2001 From: Alexander Kalankhodzhaev Date: Thu, 10 Feb 2022 10:46:33 +0300 Subject: [PATCH 2/3] WsProvider refactoring --- build.gradle | 2 +- .../transport/ProviderInterface.java | 30 +--- .../transport/ProviderStatus.java | 8 ++ .../transport/ws/WsProvider.java | 135 +++++++++--------- .../transport/ws/WsProviderTest.java | 41 ++++++ 5 files changed, 122 insertions(+), 94 deletions(-) create mode 100644 transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderStatus.java diff --git a/build.gradle b/build.gradle index 1c70b5c5..db9e5fa4 100644 --- a/build.gradle +++ b/build.gradle @@ -5,7 +5,7 @@ plugins { allprojects { group = 'com.strategyobject.substrateclient' - version = '0.0.2-SNAPSHOT' + version = '0.0.3-SNAPSHOT' repositories { mavenLocal() diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java index 420ef299..521fc2e8 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderInterface.java @@ -14,6 +14,12 @@ public interface ProviderInterface { */ boolean hasSubscriptions(); + + /** + * @return Current status + */ + ProviderStatus getStatus(); + /** * Whether the node is connected or not * @@ -30,7 +36,7 @@ public interface ProviderInterface { /** * Manually disconnect from the connection, clearing auto-connect logic */ - void disconnect(); + CompletableFuture disconnect(); /** * Subscribe to provider events @@ -41,18 +47,6 @@ public interface ProviderInterface { */ Runnable on(ProviderInterfaceEmitted type, EventListener sub); - /** - * Send data to the node - * - * @param method The RPC methods to execute - * @param params Encoded parameters as applicable for the method - * @param isCacheable Request can be cached - * @return future containing result - */ - CompletableFuture send(String method, - List params, - boolean isCacheable); - /** * Send data to the node * @@ -63,16 +57,6 @@ CompletableFuture send(String method, CompletableFuture send(String method, // TODO replace `Object` to something like `JObject` to have more strict contract List params); - /** - * Send data to the node - * - * @param method The RPC methods to execute - * @param isCacheable Request can be cached - * @return future containing result - */ - CompletableFuture send(String method, - boolean isCacheable); - /** * Send data to the node * diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderStatus.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderStatus.java new file mode 100644 index 00000000..8be076d5 --- /dev/null +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ProviderStatus.java @@ -0,0 +1,8 @@ +package com.strategyobject.substrateclient.transport; + +public enum ProviderStatus { + CONNECTING, + CONNECTED, + DISCONNECTING, + DISCONNECTED +} diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java index d1bccb01..8eefa9be 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java @@ -2,11 +2,11 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.cache.CacheBuilder; import com.strategyobject.substrateclient.common.eventemitter.EventEmitter; import com.strategyobject.substrateclient.common.eventemitter.EventListener; import com.strategyobject.substrateclient.transport.ProviderInterface; import com.strategyobject.substrateclient.transport.ProviderInterfaceEmitted; +import com.strategyobject.substrateclient.transport.ProviderStatus; import com.strategyobject.substrateclient.transport.SubscriptionHandler; import com.strategyobject.substrateclient.transport.coder.JsonRpcResponse; import com.strategyobject.substrateclient.transport.coder.JsonRpcResponseSingle; @@ -51,7 +51,6 @@ class WsStateAwaiting { @Slf4j public class WsProvider implements ProviderInterface, AutoCloseable { - private static final int CALL_CACHE_CAPACITY = 384; private static final int RESUBSCRIBE_TIMEOUT = 20; private static final Map ALIASES = new HashMap<>(); private static final ScheduledExecutorService timedOutHandlerCleaner; @@ -63,7 +62,6 @@ public class WsProvider implements ProviderInterface, AutoCloseable { ALIASES.put("chain_unsubscribeFinalisedHeads", "chain_unsubscribeFinalizedHeads"); } - private final Map> callCache; private final RpcCoder coder = new RpcCoder(); private final URI endpoint; private final Map headers; @@ -73,10 +71,11 @@ public class WsProvider implements ProviderInterface, AutoCloseable { private final Map waitingForId = new ConcurrentHashMap<>(); private final int heartbeatInterval; private final long responseTimeoutInMs; - private int autoConnectMs; + private volatile int autoConnectMs; private volatile WebSocketClient webSocket = null; private volatile CompletableFuture whenConnected = null; - private volatile boolean isConnected = false; + private volatile CompletableFuture whenDisconnected = null; + private volatile ProviderStatus status = ProviderStatus.DISCONNECTED; WsProvider(@NonNull URI endpoint, int autoConnectMs, @@ -95,10 +94,6 @@ public class WsProvider implements ProviderInterface, AutoCloseable { this.headers = headers; this.heartbeatInterval = heartbeatInterval; this.responseTimeoutInMs = responseTimeoutInMs; - this.callCache = CacheBuilder.newBuilder() - .maximumSize(CALL_CACHE_CAPACITY) - .>build() - .asMap(); } public static Builder builder() { @@ -115,6 +110,11 @@ public boolean hasSubscriptions() { return true; } + @Override + public ProviderStatus getStatus() { + return this.status; + } + /** * {@inheritDoc} * @@ -122,7 +122,7 @@ public boolean hasSubscriptions() { */ @Override public boolean isConnected() { - return this.isConnected; + return this.status == ProviderStatus.CONNECTED; } /** @@ -131,12 +131,19 @@ public boolean isConnected() { * however if you decided otherwise, you may connect manually using this method. */ public synchronized CompletableFuture connect() { - var alreadyConnected = this.whenConnected; - if (alreadyConnected != null) { - return alreadyConnected; + Preconditions.checkState( + this.status == ProviderStatus.DISCONNECTED || this.status == ProviderStatus.CONNECTING, + "WebSocket is already connected"); + + var inProgress = this.whenConnected; + if (inProgress != null) { + return inProgress; } + this.status = ProviderStatus.CONNECTING; + val whenConnected = new CompletableFuture(); + this.whenConnected = whenConnected; try { val ws = WebSocket.builder() @@ -150,14 +157,17 @@ public synchronized CompletableFuture connect() { ws.setConnectionLostTimeout(this.heartbeatInterval); this.webSocket = ws; - this.whenConnected = whenConnected; - this.eventEmitter.once(ProviderInterfaceEmitted.CONNECTED, _x -> whenConnected.complete(null)); + this.eventEmitter.once(ProviderInterfaceEmitted.CONNECTED, _x -> { + whenConnected.complete(null); + this.whenConnected = null; + }); ws.connect(); } catch (Exception ex) { log.error("Connect error", ex); whenConnected.completeExceptionally(ex); - this.whenConnected = null; this.emit(ProviderInterfaceEmitted.ERROR, ex); + this.whenConnected = null; + this.status = ProviderStatus.DISCONNECTED; } return whenConnected; @@ -167,15 +177,35 @@ public synchronized CompletableFuture connect() { * {@inheritDoc} */ @Override - public void disconnect() { - this.isConnected = false; + public synchronized CompletableFuture disconnect() { + Preconditions.checkState( + this.status == ProviderStatus.CONNECTED || this.status == ProviderStatus.DISCONNECTING, + "WebSocket is not connected"); + + var inProgress = this.whenDisconnected; + if (inProgress != null) { + return inProgress; + } + + this.status = ProviderStatus.DISCONNECTING; + + val whenDisconnected = new CompletableFuture(); + this.whenDisconnected = whenDisconnected; + // switch off autoConnect, we are in manual mode now this.autoConnectMs = 0; + this.eventEmitter.once(ProviderInterfaceEmitted.DISCONNECTED, _x -> { + whenDisconnected.complete(null); + this.whenDisconnected = null; + }); + val ws = this.webSocket; if (ws != null) { ws.close(CloseFrame.NORMAL); } + + return whenDisconnected; } /** @@ -192,32 +222,23 @@ public Runnable on(ProviderInterfaceEmitted type, EventListener sub) { return () -> this.eventEmitter.removeListener(type, sub); } - @SuppressWarnings("unchecked") private CompletableFuture send(String method, List params, - boolean isCacheable, SubscriptionHandler subscription) { - val jsonRpcRequest = this.coder.encodeObject(method, params); - val json = RpcCoder.encodeJson(jsonRpcRequest); - - var result = isCacheable ? (CompletableFuture) this.callCache.get(json) : null; - if (result != null && !result.isCompletedExceptionally()) { - log.debug("Cached result for {}", json); - return result; - } - val ws = this.webSocket; Preconditions.checkState( - ws != null && this.isConnected, + ws != null && this.isConnected(), "WebSocket is not connected"); + val jsonRpcRequest = this.coder.encodeObject(method, params); + val json = RpcCoder.encodeJson(jsonRpcRequest); val id = jsonRpcRequest.getId(); log.debug("Calling {} {}, {}, {}, {}", id, method, params, json, subscription); val whenResponseReceived = new CompletableFuture(); this.handlers.put(id, new WsStateAwaiting<>(whenResponseReceived, method, params, subscription)); - result = CompletableFuture.runAsync(() -> ws.send(json)) + return CompletableFuture.runAsync(() -> ws.send(json)) .whenCompleteAsync((_res, ex) -> { if (ex != null) { this.handlers.remove(id); @@ -227,25 +248,6 @@ private CompletableFuture send(String method, } }) .thenCombineAsync(whenResponseReceived, (_a, b) -> b); - - if (isCacheable) { - callCache.put(json, result); - } - - return result; - } - - /** - * Send JSON data using WebSockets to configured endpoint - * - * @param method The RPC methods to execute - * @param params Encoded parameters as applicable for the method - * @param isCacheable Request can be cached - * @return future containing result - */ - @Override - public CompletableFuture send(String method, List params, boolean isCacheable) { - return send(method, params, isCacheable, null); } /** @@ -257,19 +259,7 @@ public CompletableFuture send(String method, List params, boolea */ @Override public CompletableFuture send(String method, List params) { - return send(method, params, false, null); - } - - /** - * Send JSON data using WebSockets to configured endpoint - * - * @param method The RPC methods to execute - * @param isCacheable Request can be cached - * @return future containing result - */ - @Override - public CompletableFuture send(String method, boolean isCacheable) { - return send(method, null, false, null); + return send(method, params, null); } /** @@ -280,7 +270,7 @@ public CompletableFuture send(String method, boolean isCacheable) { */ @Override public CompletableFuture send(String method) { - return send(method, null, false, null); + return send(method, null, null); } /** @@ -296,7 +286,7 @@ public CompletableFuture subscribe(String type, String method, List params, BiConsumer callback) { - return this.send(method, params, false, new SubscriptionHandler(callback, type)); + return this.send(method, params, new SubscriptionHandler(callback, type)); } /** @@ -323,7 +313,7 @@ public CompletableFuture unsubscribe(String type, String method, String } else { this.subscriptions.remove(subscription); if (this.isConnected() && this.webSocket != null) { - return this.send(method, Collections.singletonList(id), false, null); + return this.send(method, Collections.singletonList(id), null); } whenUnsubscribed.complete(true); @@ -375,9 +365,8 @@ private void onSocketClose(int code, String reason) { this.handlers.clear(); this.waitingForId.clear(); - this.isConnected = false; - this.whenConnected = null; this.webSocket = null; + this.status = ProviderStatus.DISCONNECTED; this.emit(ProviderInterfaceEmitted.DISCONNECTED); if (this.autoConnectMs > 0) { @@ -469,14 +458,20 @@ private void onSocketMessageSubscribe(JsonRpcResponseSubscription response) { public void onSocketOpen() { log.info("Connected to: {}", this.webSocket.getURI()); - this.isConnected = true; + this.status = ProviderStatus.CONNECTED; this.emit(ProviderInterfaceEmitted.CONNECTED); this.resubscribe(); } @Override public void close() { - this.disconnect(); + try { + if (this.status == ProviderStatus.CONNECTED || this.status == ProviderStatus.DISCONNECTING) { + this.disconnect(); + } + } catch (Exception ex) { + log.error("Error while automatic closing", ex); + } } private void resubscribe() { diff --git a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java index c6544370..dcf25c27 100644 --- a/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java +++ b/transport/src/test/java/com/strategyobject/substrateclient/transport/ws/WsProviderTest.java @@ -4,6 +4,7 @@ import com.strategyobject.substrateclient.tests.containers.SubstrateVersion; import com.strategyobject.substrateclient.tests.containers.TestSubstrateContainer; import com.strategyobject.substrateclient.transport.ProviderInterfaceEmitted; +import com.strategyobject.substrateclient.transport.ProviderStatus; import lombok.SneakyThrows; import lombok.val; import org.junit.jupiter.api.Test; @@ -42,6 +43,7 @@ void connectReturnsSameFutureWhenCalledMultiple() { try (val wsProvider = WsProvider.builder() .setEndpoint(substrate.getWsAddress()) .build()) { + val connectA = wsProvider.connect(); val connectB = wsProvider.connect(); @@ -98,6 +100,23 @@ void canDisconnect() { } } + @Test + @SneakyThrows + void disconnectReturnsSameFutureWhenCalledMultiple() { + try (val wsProvider = WsProvider.builder() + .setEndpoint(substrate.getWsAddress()) + .build()) { + + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + + val disconnectA = wsProvider.disconnect(); + val disconnectB = wsProvider.disconnect(); + + assertEquals(disconnectA, disconnectB); + } + } + @Test @SneakyThrows void notifiesWhenDisconnected() { @@ -206,7 +225,29 @@ void supportsSubscriptions() { .setEndpoint(substrate.getWsAddress()) .disableAutoConnect() .build()) { + assertTrue(wsProvider.hasSubscriptions()); } } + + @Test + @SneakyThrows + void canReconnectManually() { + try (val wsProvider = WsProvider.builder() + .setEndpoint(substrate.getWsAddress()) + .disableAutoConnect() + .build()) { + + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + + wsProvider.disconnect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertFalse(wsProvider.isConnected()); + assertEquals(ProviderStatus.DISCONNECTED, wsProvider.getStatus()); + + wsProvider.connect().get(WAIT_TIMEOUT, TimeUnit.SECONDS); + assertTrue(wsProvider.isConnected()); + assertEquals(ProviderStatus.CONNECTED, wsProvider.getStatus()); + } + } } \ No newline at end of file From b56ac3ffacb7eb5b52c29fea26dd12472e25d79e Mon Sep 17 00:00:00 2001 From: Alexander Kalankhodzhaev Date: Thu, 10 Feb 2022 20:50:38 +0300 Subject: [PATCH 3/3] WsProvider refactoring --- .../transport/ws/WsProvider.java | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java index 8eefa9be..e6d925b1 100644 --- a/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java +++ b/transport/src/main/java/com/strategyobject/substrateclient/transport/ws/WsProvider.java @@ -131,8 +131,9 @@ public boolean isConnected() { * however if you decided otherwise, you may connect manually using this method. */ public synchronized CompletableFuture connect() { + val currentStatus = this.status; Preconditions.checkState( - this.status == ProviderStatus.DISCONNECTED || this.status == ProviderStatus.CONNECTING, + currentStatus == ProviderStatus.DISCONNECTED || currentStatus == ProviderStatus.CONNECTING, "WebSocket is already connected"); var inProgress = this.whenConnected; @@ -178,27 +179,24 @@ public synchronized CompletableFuture connect() { */ @Override public synchronized CompletableFuture disconnect() { + val currentStatus = this.status; + var inProgress = this.whenDisconnected; + Preconditions.checkState( - this.status == ProviderStatus.CONNECTED || this.status == ProviderStatus.DISCONNECTING, + currentStatus == ProviderStatus.CONNECTED || + (currentStatus == ProviderStatus.DISCONNECTING && inProgress != null), "WebSocket is not connected"); - var inProgress = this.whenDisconnected; if (inProgress != null) { return inProgress; } - this.status = ProviderStatus.DISCONNECTING; - val whenDisconnected = new CompletableFuture(); this.whenDisconnected = whenDisconnected; // switch off autoConnect, we are in manual mode now this.autoConnectMs = 0; - - this.eventEmitter.once(ProviderInterfaceEmitted.DISCONNECTED, _x -> { - whenDisconnected.complete(null); - this.whenDisconnected = null; - }); + this.status = ProviderStatus.DISCONNECTING; val ws = this.webSocket; if (ws != null) { @@ -343,7 +341,12 @@ private void emit(ProviderInterfaceEmitted type, Object... args) { this.eventEmitter.emit(type, args); } - private void onSocketClose(int code, String reason) { + private synchronized void onSocketClose(int code, String reason) { + val currentStatus = this.status; + if (currentStatus == ProviderStatus.CONNECTED || currentStatus == ProviderStatus.CONNECTING) { + this.status = ProviderStatus.DISCONNECTING; + } + if (Strings.isNullOrEmpty(reason)) { reason = ErrorCodes.getWSErrorString(code); } @@ -368,6 +371,11 @@ private void onSocketClose(int code, String reason) { this.webSocket = null; this.status = ProviderStatus.DISCONNECTED; this.emit(ProviderInterfaceEmitted.DISCONNECTED); + val whenDisconnected = this.whenDisconnected; + if (whenDisconnected != null) { + whenDisconnected.complete(null); + this.whenDisconnected = null; + } if (this.autoConnectMs > 0) { log.info("Trying to reconnect to {}", this.endpoint); @@ -455,7 +463,7 @@ private void onSocketMessageSubscribe(JsonRpcResponseSubscription response) { } } - public void onSocketOpen() { + public synchronized void onSocketOpen() { log.info("Connected to: {}", this.webSocket.getURI()); this.status = ProviderStatus.CONNECTED; @@ -466,7 +474,8 @@ public void onSocketOpen() { @Override public void close() { try { - if (this.status == ProviderStatus.CONNECTED || this.status == ProviderStatus.DISCONNECTING) { + val currentStatus = this.status; + if (currentStatus == ProviderStatus.CONNECTED || currentStatus == ProviderStatus.DISCONNECTING) { this.disconnect(); } } catch (Exception ex) {