From f2a3af4f9747882e295833cb6c62f12c2ac8c31a Mon Sep 17 00:00:00 2001 From: Nida Karim Ali Date: Sat, 16 Sep 2023 14:27:30 +0500 Subject: [PATCH 1/5] made socket client sync for callbacks --- pyproject.toml | 2 +- src/bluefin_v2_client/sockets_lib.py | 181 +++++++++++---------------- 2 files changed, 77 insertions(+), 106 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 567e2d5..ca4af0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "bluefin_v2_client" -version = "2.6.0" +version = "2.7.0" description = "Library to interact with Bluefin exchange protocol including its off-chain api-gateway and on-chain contracts" readme = "README.md" requires-python = ">=3.8" diff --git a/src/bluefin_v2_client/sockets_lib.py b/src/bluefin_v2_client/sockets_lib.py index 3a4f2f0..f00876a 100644 --- a/src/bluefin_v2_client/sockets_lib.py +++ b/src/bluefin_v2_client/sockets_lib.py @@ -2,8 +2,7 @@ import time from .enumerations import MARKET_SYMBOLS, SOCKET_EVENTS import asyncio - -sio = socketio.Client() +sio = socketio.AsyncClient() class Sockets: @@ -16,37 +15,38 @@ def __init__(self, url, timeout=10, token=None) -> None: self.api_token = "" return - def _establish_connection(self): + async def _establish_connection(self): """ - Connects to the desired url + Connects to the desired url """ try: - sio.connect(self.url, wait_timeout=self.timeout, transports=["websocket"]) + await sio.connect(self.url, wait_timeout=self.timeout, + transports=["websocket"]) return True except: return False def set_token(self, token): """ - Sets default user token - Inputs: - - token (user auth token): Bluefin onboarding token. + Sets default user token + Inputs: + - token (user auth token): firefly onboarding token. """ self.token = token def set_api_token(self, token): """ - Sets default user token - Inputs: - - token (user auth token): Bluefin onboarding token. + Sets default user token + Inputs: + - token (user auth token): firefly onboarding token. """ self.api_token = token async def open(self): """ - opens socket instance connection + opens socket instance connection """ - self.connection_established = self._establish_connection() + self.connection_established = await self._establish_connection() if not self.connection_established: await self.close() raise (Exception("Failed to connect to Host: {}".format(self.url))) @@ -54,71 +54,64 @@ async def open(self): async def close(self): """ - closes the socket instance connection + closes the socket instance connection """ - sio.disconnect() + await sio.disconnect() return @sio.on("*") - def listener(event, data): + async def listener(event, data): """ - Listens to all events emitted by the server + Listens to all events emitted by the server """ try: if event in Sockets.callbacks.keys(): - Sockets.callbacks[event](data) + await Sockets.callbacks[event](data) elif "default" in Sockets.callbacks.keys(): - Sockets.callbacks["default"]({"event": event, "data": data}) + await Sockets.callbacks["default"]({"event": event, "data": data}) else: pass except: pass return - + @sio.event - def connect(): + async def connect(): print("Connected To Socket Server") - # add 10 seconds sleep to allow connection to be established before callbacks for connections are executed - if "connect" in Sockets.callbacks: + if 'connect' in Sockets.callbacks: # Execute the callback using asyncio.run() if available - time.sleep(10) - asyncio.run(Sockets.callbacks["connect"]()) + await Sockets.callbacks['connect']() + @sio.event - def disconnect(): - print("Disconnected From Socket Server") - if "disconnect" in Sockets.callbacks: + async def disconnect(): + print('Disconnected From Socket Server') + if 'disconnect' in Sockets.callbacks: # Execute the callback using asyncio.run() if available - asyncio.run(Sockets.callbacks["disconnect"]()) + await Sockets.callbacks['disconnect']() + + async def listen(self, event, callback): """ - Assigns callbacks to desired events + Assigns callbacks to desired events """ Sockets.callbacks[event] = callback return async def subscribe_global_updates_by_symbol(self, symbol: MARKET_SYMBOLS): """ - Allows user to subscribe to global updates for the desired symbol. - Inputs: - - symbol: market symbol of market user wants global updates for. (e.g. DOT-PERP) + Allows user to subscribe to global updates for the desired symbol. + Inputs: + - symbol: market symbol of market user wants global updates for. (e.g. DOT-PERP) """ try: - if not self.connection_established: - raise Exception( - "Socket connection is established, invoke socket.open()" - ) - - resp = sio.call( - "SUBSCRIBE", - [ - { - "e": SOCKET_EVENTS.GLOBAL_UPDATES_ROOM.value, - "p": symbol.value, - }, - ], - ) + resp = await sio.call('SUBSCRIBE', [ + { + "e": SOCKET_EVENTS.GLOBAL_UPDATES_ROOM.value, + "p": symbol.value, + }, + ]) return resp["success"] except Exception as e: print("Error: ", e) @@ -126,84 +119,62 @@ async def subscribe_global_updates_by_symbol(self, symbol: MARKET_SYMBOLS): async def unsubscribe_global_updates_by_symbol(self, symbol: MARKET_SYMBOLS): """ - Allows user to unsubscribe to global updates for the desired symbol. - Inputs: - - symbol: market symbol of market user wants to remove global updates for. (e.g. DOT-PERP) + Allows user to unsubscribe to global updates for the desired symbol. + Inputs: + - symbol: market symbol of market user wants to remove global updates for. (e.g. DOT-PERP) """ try: - if not self.connection_established: - return False - - resp = sio.call( - "UNSUBSCRIBE", - [ - { - "e": SOCKET_EVENTS.GLOBAL_UPDATES_ROOM.value, - "p": symbol.value, - }, - ], - ) + resp = await sio.call('UNSUBSCRIBE', [ + { + "e": SOCKET_EVENTS.GLOBAL_UPDATES_ROOM.value, + "p": symbol.value, + }, + ]) return resp["success"] except Exception as e: print(e) return False - async def subscribe_user_update_by_token( - self, parent_account: str = None, user_token: str = None - ) -> bool: + async def subscribe_user_update_by_token(self, parent_account: str = None, user_token: str = None) -> bool: """ - Allows user to subscribe to their account updates. - Inputs: - - parent_account(str): address of parent account. Only whitelisted - sub-account can listen to its parent account position updates - - token(str): auth token generated when onboarding on Bluefin + Allows user to subscribe to their account updates. + Inputs: + - parent_account(str): address of parent account. Only whitelisted + sub-account can listen to its parent account position updates + - token(str): auth token generated when onboarding on firefly """ try: - if not self.connection_established: - return False - - resp = sio.call( - "SUBSCRIBE", - [ - { - "e": SOCKET_EVENTS.USER_UPDATES_ROOM.value, - "pa": parent_account, - "t": self.token if user_token == None else user_token, - "rt": self.api_token, - }, - ], - ) + resp = await sio.call("SUBSCRIBE", [ + { + "e": SOCKET_EVENTS.USER_UPDATES_ROOM.value, + 'pa': parent_account, + "t": self.token if user_token == None else user_token, + "rt": self.api_token + }, + ]) return resp["success"] except Exception as e: print(e) return False - async def unsubscribe_user_update_by_token( - self, parent_account: str = None, user_token: str = None - ): + async def unsubscribe_user_update_by_token(self, parent_account: str = None, user_token: str = None): """ - Allows user to unsubscribe to their account updates. - Inputs: - - parent_account(str): address of parent account. Only for sub-accounts - - token: auth token generated when onboarding on Bluefin + Allows user to unsubscribe to their account updates. + Inputs: + - parent_account(str): address of parent account. Only for sub-accounts + - token: auth token generated when onboarding on firefly """ try: - if not self.connection_established: - return False - - resp = sio.call( - "UNSUBSCRIBE", - [ - { - "e": SOCKET_EVENTS.USER_UPDATES_ROOM.value, - "pa": parent_account, - "t": self.token if user_token == None else user_token, - "rt": self.api_token, - }, - ], - ) + resp = await sio.call("UNSUBSCRIBE", [ + { + "e": SOCKET_EVENTS.USER_UPDATES_ROOM.value, + 'pa': parent_account, + "t": self.token if user_token == None else user_token, + "rt": self.api_token + }, + ]) return resp["success"] except: return False From 0331b5e1d6c0fcb081e675db0a6f94a8fc2372e1 Mon Sep 17 00:00:00 2001 From: Nida Karim Ali Date: Sat, 16 Sep 2023 15:04:46 +0500 Subject: [PATCH 2/5] updated example --- examples/13.orderbook_updates.py | 105 ++++++++++++++++--------------- 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/examples/13.orderbook_updates.py b/examples/13.orderbook_updates.py index fbdfc5f..815b02a 100644 --- a/examples/13.orderbook_updates.py +++ b/examples/13.orderbook_updates.py @@ -1,13 +1,10 @@ -""" -When ever the state of orderbook changes, an event is emitted by exchange. -In this code example we open a socket connection and listen to orderbook update event -""" +### +# Places a market order on exchange and listens to emitted events +## import sys, os - sys.path.append(os.getcwd() + "/src/") import time from config import TEST_ACCT_KEY, TEST_NETWORK -import asyncio from bluefin_v2_client import ( BluefinClient, Networks, @@ -15,39 +12,16 @@ SOCKET_EVENTS, ORDER_SIDE, ORDER_TYPE, + ORDER_STATUS, OrderSignatureRequest, ) - +import asyncio event_received = False -async def place_limit_order(client: BluefinClient): - # default leverage of account is set to 3 on Bluefin - user_leverage = await client.get_user_leverage(MARKET_SYMBOLS.ETH) - - # creates a LIMIT order to be signed - signature_request = OrderSignatureRequest( - symbol=MARKET_SYMBOLS.ETH, # market symbol - price=1300, # price at which you want to place order - quantity=0.01, # quantity - side=ORDER_SIDE.SELL, - orderType=ORDER_TYPE.LIMIT, - leverage=user_leverage, - ) - # create signed order - signed_order = client.create_signed_order(signature_request) - - print("Placing a limit order") - # place signed order on orderbook - resp = await client.post_signed_order(signed_order) - - # returned order with PENDING state - print(resp) - return - - async def main(): + client = BluefinClient(True, Networks[TEST_NETWORK], TEST_ACCT_KEY) await client.init(True) @@ -56,36 +30,69 @@ def callback(event): print("Event data:", event) event_received = True - # must open socket before subscribing - print("Making socket connection to Bluefin exchange") + async def connection_callback(): + # This callback will be invoked as soon as the socket connection is established + # subscribe to global event updates for BTC market + status = await client.socket.subscribe_global_updates_by_symbol(MARKET_SYMBOLS.ETH) + print("Subscribed to global ETH events: {}".format(status)) + + # triggered when orderbook updates are received + print("Listening to orderbook updates") + await client.socket.listen(SOCKET_EVENTS.ORDERBOOK_UPDATE.value, callback) + + async def disconnection_callback(): + print("Sockets disconnected, performing actions...") + resp = await client.cancel_all_orders(MARKET_SYMBOLS.ETH, [ORDER_STATUS.OPEN, ORDER_STATUS.PARTIAL_FILLED]) + print(resp) + + +# must specify connection_callback before opening the sockets below + await client.socket.listen("connect", connection_callback) + await client.socket.listen("disconnect", disconnection_callback) + + print("Making socket connection to firefly exchange") await client.socket.open() - # subscribe to global event updates for ETH market - await client.socket.subscribe_global_updates_by_symbol(MARKET_SYMBOLS.ETH) - print("Subscribed to ETH Market events") + ######## Placing an Order ######## + + # default leverage of account is set to 3 on firefly + user_leverage = await client.get_user_leverage(MARKET_SYMBOLS.ETH) - print("Listening to ETH Orderbook update event") - await client.socket.listen(SOCKET_EVENTS.ORDERBOOK_UPDATE.value, callback) + # creates a MARKET order to be signed + signature_request = OrderSignatureRequest( + symbol=MARKET_SYMBOLS.ETH, + leverage=user_leverage, + price=1300, + quantity=0.5, + side=ORDER_SIDE.BUY, + orderType=ORDER_TYPE.LIMIT + ) - await place_limit_order(client) + # create signed order + signed_order = client.create_signed_order(signature_request) - timeout = 30 + print("Placing a market order") + # place signed order on orderbook + resp = await client.post_signed_order(signed_order) + print(resp) + ###### Closing socket connections after 30 seconds ##### + timeout = 10 end_time = time.time() + timeout while not event_received and time.time() < end_time: time.sleep(1) - status = await client.socket.unsubscribe_global_updates_by_symbol( - MARKET_SYMBOLS.ETH - ) - print("Unsubscribed from orderbook update events for ETH Market: {}".format(status)) - # close socket connection + # # close socket connection print("Closing sockets!") await client.socket.close() - await client.close_connections() - if __name__ == "__main__": + ### make sure keep the loop initialization same + # as below to ensure closing the script after receiving + # completion of each callback on socket events ### loop = asyncio.new_event_loop() - loop.run_until_complete(main()) + loop.create_task(main()) + pending = asyncio.all_tasks(loop=loop) + group = asyncio.gather(*pending) + loop.run_until_complete(group) loop.close() From 8b8b6993d09a20dfcca11a5493b77b23bfaa5d62 Mon Sep 17 00:00:00 2001 From: Nida Karim Ali Date: Sat, 16 Sep 2023 15:07:08 +0500 Subject: [PATCH 3/5] updated example --- examples/13.orderbook_updates.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/13.orderbook_updates.py b/examples/13.orderbook_updates.py index 815b02a..91dd136 100644 --- a/examples/13.orderbook_updates.py +++ b/examples/13.orderbook_updates.py @@ -1,6 +1,7 @@ -### -# Places a market order on exchange and listens to emitted events -## +#### +## When ever the state of orderbook changes, an event is emitted by exchange. +## In this code example we open a socket connection and listen to orderbook update event +#### import sys, os sys.path.append(os.getcwd() + "/src/") import time From 79162fca6e9f4bb9410ae1b8ebaa3a204f332676 Mon Sep 17 00:00:00 2001 From: Nida Karim Ali Date: Sat, 16 Sep 2023 15:31:14 +0500 Subject: [PATCH 4/5] updated example --- examples/13.orderbook_updates.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/13.orderbook_updates.py b/examples/13.orderbook_updates.py index 91dd136..2e73690 100644 --- a/examples/13.orderbook_updates.py +++ b/examples/13.orderbook_updates.py @@ -51,12 +51,12 @@ async def disconnection_callback(): await client.socket.listen("connect", connection_callback) await client.socket.listen("disconnect", disconnection_callback) - print("Making socket connection to firefly exchange") + print("Making socket connection to bluefin exchange") await client.socket.open() ######## Placing an Order ######## - # default leverage of account is set to 3 on firefly + # default leverage of account is set to 3 on bluefin user_leverage = await client.get_user_leverage(MARKET_SYMBOLS.ETH) # creates a MARKET order to be signed From 6ed69b94613d6c2144ee8c82392ef13bf4d1f8e7 Mon Sep 17 00:00:00 2001 From: Nida Karim Ali Date: Sat, 16 Sep 2023 15:36:21 +0500 Subject: [PATCH 5/5] updated comments --- examples/13.orderbook_updates.py | 2 +- src/bluefin_v2_client/sockets_lib.py | 15 ++++++--------- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/examples/13.orderbook_updates.py b/examples/13.orderbook_updates.py index 2e73690..d588ed8 100644 --- a/examples/13.orderbook_updates.py +++ b/examples/13.orderbook_updates.py @@ -33,7 +33,7 @@ def callback(event): async def connection_callback(): # This callback will be invoked as soon as the socket connection is established - # subscribe to global event updates for BTC market + # subscribe to global event updates for ETH market status = await client.socket.subscribe_global_updates_by_symbol(MARKET_SYMBOLS.ETH) print("Subscribed to global ETH events: {}".format(status)) diff --git a/src/bluefin_v2_client/sockets_lib.py b/src/bluefin_v2_client/sockets_lib.py index f00876a..f98a908 100644 --- a/src/bluefin_v2_client/sockets_lib.py +++ b/src/bluefin_v2_client/sockets_lib.py @@ -21,7 +21,7 @@ async def _establish_connection(self): """ try: await sio.connect(self.url, wait_timeout=self.timeout, - transports=["websocket"]) + transports=["websocket"]) return True except: return False @@ -30,7 +30,7 @@ def set_token(self, token): """ Sets default user token Inputs: - - token (user auth token): firefly onboarding token. + - token (user auth token): Bluefin onboarding token. """ self.token = token @@ -38,7 +38,7 @@ def set_api_token(self, token): """ Sets default user token Inputs: - - token (user auth token): firefly onboarding token. + - token (user auth token): Bluefin onboarding token. """ self.api_token = token @@ -74,14 +74,13 @@ async def listener(event, data): except: pass return - + @sio.event async def connect(): print("Connected To Socket Server") if 'connect' in Sockets.callbacks: # Execute the callback using asyncio.run() if available await Sockets.callbacks['connect']() - @sio.event async def disconnect(): @@ -89,8 +88,6 @@ async def disconnect(): if 'disconnect' in Sockets.callbacks: # Execute the callback using asyncio.run() if available await Sockets.callbacks['disconnect']() - - async def listen(self, event, callback): """ @@ -142,7 +139,7 @@ async def subscribe_user_update_by_token(self, parent_account: str = None, user_ Inputs: - parent_account(str): address of parent account. Only whitelisted sub-account can listen to its parent account position updates - - token(str): auth token generated when onboarding on firefly + - token(str): auth token generated when onboarding on Bluefin """ try: resp = await sio.call("SUBSCRIBE", [ @@ -164,7 +161,7 @@ async def unsubscribe_user_update_by_token(self, parent_account: str = None, use Allows user to unsubscribe to their account updates. Inputs: - parent_account(str): address of parent account. Only for sub-accounts - - token: auth token generated when onboarding on firefly + - token: auth token generated when onboarding on Bluefin """ try: resp = await sio.call("UNSUBSCRIBE", [