From 8cdec582bc669dc2cbe5ee4ff9bfbcc763764f7e Mon Sep 17 00:00:00 2001 From: useongee <118250509+USEONGEE@users.noreply.github.com> Date: Sun, 10 Aug 2025 17:20:07 +0900 Subject: [PATCH 1/2] feat: add reconnectable websocket manager --- hyperliquid/websocket_manager.py | 147 +++++++++++++++++++++++++++++-- 1 file changed, 139 insertions(+), 8 deletions(-) diff --git a/hyperliquid/websocket_manager.py b/hyperliquid/websocket_manager.py index 4c73a688..44c09c5a 100644 --- a/hyperliquid/websocket_manager.py +++ b/hyperliquid/websocket_manager.py @@ -74,6 +74,50 @@ def ws_msg_to_identifier(ws_msg: WsMsg) -> Optional[str]: return f'activeAssetData:{ws_msg["data"]["coin"].lower()},{ws_msg["data"]["user"].lower()}' +def identifier_to_subscription(identifier: str) -> dict: + if identifier == "allMids": + return {"type": "allMids"} + elif identifier == "userEvents": + return {"type": "userEvents"} + elif identifier == "orderUpdates": + return {"type": "orderUpdates"} + + if identifier.startswith("l2Book:"): + coin = identifier[len("l2Book:") :] + return {"type": "l2Book", "coin": coin} + elif identifier.startswith("trades:"): + coin = identifier[len("trades:") :] + return {"type": "trades", "coin": coin} + elif identifier.startswith("userFills:"): + user = identifier[len("userFills:") :] + return {"type": "userFills", "user": user} + elif identifier.startswith("candle:"): + data = identifier[len("candle:") :] + coin, interval = data.split(",", 1) + return {"type": "candle", "coin": coin, "interval": interval} + elif identifier.startswith("userFundings:"): + user = identifier[len("userFundings:") :] + return {"type": "userFundings", "user": user} + elif identifier.startswith("userNonFundingLedgerUpdates:"): + user = identifier[len("userNonFundingLedgerUpdates:") :] + return {"type": "userNonFundingLedgerUpdates", "user": user} + elif identifier.startswith("webData2:"): + user = identifier[len("webData2:") :] + return {"type": "webData2", "user": user} + elif identifier.startswith("bbo:"): + coin = identifier[len("bbo:") :] + return {"type": "bbo", "coin": coin} + elif identifier.startswith("activeAssetCtx:"): + coin = identifier[len("activeAssetCtx:") :] + return {"type": "activeAssetCtx", "coin": coin} + elif identifier.startswith("activeAssetData:"): + data = identifier[len("activeAssetData:") :] + coin, user = data.split(",", 1) + return {"type": "activeAssetData", "coin": coin, "user": user} + + raise ValueError(f"Unknown subscription identifier: {identifier}") + + class WebsocketManager(threading.Thread): def __init__(self, base_url): super().__init__() @@ -90,14 +134,6 @@ def run(self): self.ping_sender.start() self.ws.run_forever() - def send_ping(self): - while not self.stop_event.wait(50): - if not self.ws.keep_running: - break - logging.debug("Websocket sending ping") - self.ws.send(json.dumps({"method": "ping"})) - logging.debug("Websocket ping sender stopped") - def stop(self): self.stop_event.set() self.ws.close() @@ -160,3 +196,98 @@ def unsubscribe(self, subscription: Subscription, subscription_id: int) -> bool: self.ws.send(json.dumps({"method": "unsubscribe", "subscription": subscription})) self.active_subscriptions[identifier] = new_active_subscriptions return len(active_subscriptions) != len(new_active_subscriptions) + + +class ReconnectableWebsocketManager(WebsocketManager): + def __init__(self, base_url: str, *args, **kwargs): + # Store base_url for reuse during reconnection + self.base_url = base_url + super().__init__(base_url, *args, **kwargs) + # Override close and error handlers to trigger reconnection + self.ws.on_close = self.on_close + self.ws.on_error = self.on_error + # Lock to protect shared state (queued_subscriptions, active_subscriptions) + + def send_ping(self): + interval = 50 + while not self.stop_event.wait(interval): + try: + ws = getattr(self, "ws", None) + # 연결 미준비/끊김이면 전송 스킵 (스레드는 계속 유지) + if not ws or not getattr(ws, "keep_running", False) or not self.ws_ready: + continue + logging.debug("Websocket sending app-level ping") + ws.send(json.dumps({"method": "ping"})) + except Exception: + # 재연결 중 교체 등으로 생길 수 있는 예외는 조용히 스킵 + logging.debug("App-level ping skipped due to connection state", exc_info=True) + logging.debug("Websocket ping sender stopped") + + def on_open(self, _ws): + logging.debug("on_open") + self.ws_ready = True + # Process queued subscriptions and move them to active_subscriptions + for subscription, active_subscription in self.queued_subscriptions: + self.subscribe( + subscription, + active_subscription.callback, + active_subscription.subscription_id, + ) + self.queued_subscriptions.clear() + + def _reconnect(self): + # Mark connection as not ready + self.ws_ready = False + # Move all active subscriptions back into the subscription queue + for identifier, active_subscriptions in self.active_subscriptions.items(): + subscription = identifier_to_subscription(identifier) + for active_subscription in active_subscriptions: + self.subscribe( + subscription, + active_subscription.callback, + active_subscription.subscription_id, + ) + self.active_subscriptions.clear() + + def _start_ping_sender(self): + if not self.ping_sender.is_alive(): + self.ping_sender = threading.Thread(target=self.send_ping, daemon=True) + self.ping_sender.start() + + def on_close(self, ws, close_status_code, close_msg): + logging.debug(f"ReconnectableWebsocketManager on_close: {close_status_code} - {close_msg}") + # If stop_event is set, skip reconnection + if self.stop_event.is_set(): + return + self._reconnect() + + def on_error(self, ws, error): + logging.debug(f"ReconnectableWebsocketManager on_error: {error}") + # If stop_event is set, skip reconnection + if self.stop_event.is_set(): + return + self._reconnect() + + def run(self, *, ping_timeout=15, ping_interval=30, reconnect_interval=5): + # Start the ping sender thread (keeps connection alive) + self._start_ping_sender() + # Main loop to maintain the connection and handle reconnections + while not self.stop_event.is_set(): + logging.debug("ReconnectableWebsocketManager connecting...") + self.ws.run_forever(ping_timeout=ping_timeout, ping_interval=ping_interval) + logging.debug( + f"ReconnectableWebsocketManager disconnected. Reconnecting in {reconnect_interval} seconds..." + ) + # Wait for the reconnection interval or break if stop_event is set + if self.stop_event.wait(reconnect_interval): + break + # Create a new WebSocketApp instance for reconnection + ws_url = "ws" + self.base_url[len("http") :] + "/ws" + self.ws = websocket.WebSocketApp( + ws_url, + on_message=self.on_message, + on_open=self.on_open, + on_close=self.on_close, + on_error=self.on_error, + ) + self._start_ping_sender() From 28a0a4b60bd22805af9073e1981c650425198d11 Mon Sep 17 00:00:00 2001 From: useongee <118250509+USEONGEE@users.noreply.github.com> Date: Sun, 10 Aug 2025 17:26:46 +0900 Subject: [PATCH 2/2] fix: restore original send_ping() implementation in WebsocketManager The previous commit mistakenly modified the original send_ping() method in WebsocketManager. This change restores the method to its original implementation so that the base class remains unmodified. This ensures that the upcoming PR for ReconnectableWebsocketManager remains self-contained and does not alter the behavior of the original WebsocketManager. --- hyperliquid/websocket_manager.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/hyperliquid/websocket_manager.py b/hyperliquid/websocket_manager.py index 44c09c5a..a071bbc6 100644 --- a/hyperliquid/websocket_manager.py +++ b/hyperliquid/websocket_manager.py @@ -134,6 +134,14 @@ def run(self): self.ping_sender.start() self.ws.run_forever() + def send_ping(self): + while not self.stop_event.wait(50): + if not self.ws.keep_running: + break + logging.debug("Websocket sending ping") + self.ws.send(json.dumps({"method": "ping"})) + logging.debug("Websocket ping sender stopped") + def stop(self): self.stop_event.set() self.ws.close() @@ -208,21 +216,6 @@ def __init__(self, base_url: str, *args, **kwargs): self.ws.on_error = self.on_error # Lock to protect shared state (queued_subscriptions, active_subscriptions) - def send_ping(self): - interval = 50 - while not self.stop_event.wait(interval): - try: - ws = getattr(self, "ws", None) - # 연결 미준비/끊김이면 전송 스킵 (스레드는 계속 유지) - if not ws or not getattr(ws, "keep_running", False) or not self.ws_ready: - continue - logging.debug("Websocket sending app-level ping") - ws.send(json.dumps({"method": "ping"})) - except Exception: - # 재연결 중 교체 등으로 생길 수 있는 예외는 조용히 스킵 - logging.debug("App-level ping skipped due to connection state", exc_info=True) - logging.debug("Websocket ping sender stopped") - def on_open(self, _ws): logging.debug("on_open") self.ws_ready = True