diff --git a/examples/13.orderbook_updates.py b/examples/13.orderbook_updates.py index fbdfc5f..d588ed8 100644 --- a/examples/13.orderbook_updates.py +++ b/examples/13.orderbook_updates.py @@ -1,13 +1,11 @@ -""" -When ever the state of orderbook changes, an event is emitted by exchange. -In this code example we open a socket connection and listen to orderbook update event -""" +#### +## When ever the state of orderbook changes, an event is emitted by exchange. +## In this code example we open a socket connection and listen to orderbook update event +#### import sys, os - sys.path.append(os.getcwd() + "/src/") import time from config import TEST_ACCT_KEY, TEST_NETWORK -import asyncio from bluefin_v2_client import ( BluefinClient, Networks, @@ -15,39 +13,16 @@ SOCKET_EVENTS, ORDER_SIDE, ORDER_TYPE, + ORDER_STATUS, OrderSignatureRequest, ) - +import asyncio event_received = False -async def place_limit_order(client: BluefinClient): - # default leverage of account is set to 3 on Bluefin - user_leverage = await client.get_user_leverage(MARKET_SYMBOLS.ETH) - - # creates a LIMIT order to be signed - signature_request = OrderSignatureRequest( - symbol=MARKET_SYMBOLS.ETH, # market symbol - price=1300, # price at which you want to place order - quantity=0.01, # quantity - side=ORDER_SIDE.SELL, - orderType=ORDER_TYPE.LIMIT, - leverage=user_leverage, - ) - # create signed order - signed_order = client.create_signed_order(signature_request) - - print("Placing a limit order") - # place signed order on orderbook - resp = await client.post_signed_order(signed_order) - - # returned order with PENDING state - print(resp) - return - - async def main(): + client = BluefinClient(True, Networks[TEST_NETWORK], TEST_ACCT_KEY) await client.init(True) @@ -56,36 +31,69 @@ def callback(event): print("Event data:", event) event_received = True - # must open socket before subscribing - print("Making socket connection to Bluefin exchange") + async def connection_callback(): + # This callback will be invoked as soon as the socket connection is established + # subscribe to global event updates for ETH market + status = await client.socket.subscribe_global_updates_by_symbol(MARKET_SYMBOLS.ETH) + print("Subscribed to global ETH events: {}".format(status)) + + # triggered when orderbook updates are received + print("Listening to orderbook updates") + await client.socket.listen(SOCKET_EVENTS.ORDERBOOK_UPDATE.value, callback) + + async def disconnection_callback(): + print("Sockets disconnected, performing actions...") + resp = await client.cancel_all_orders(MARKET_SYMBOLS.ETH, [ORDER_STATUS.OPEN, ORDER_STATUS.PARTIAL_FILLED]) + print(resp) + + +# must specify connection_callback before opening the sockets below + await client.socket.listen("connect", connection_callback) + await client.socket.listen("disconnect", disconnection_callback) + + print("Making socket connection to bluefin exchange") await client.socket.open() - # subscribe to global event updates for ETH market - await client.socket.subscribe_global_updates_by_symbol(MARKET_SYMBOLS.ETH) - print("Subscribed to ETH Market events") + ######## Placing an Order ######## + + # default leverage of account is set to 3 on bluefin + user_leverage = await client.get_user_leverage(MARKET_SYMBOLS.ETH) - print("Listening to ETH Orderbook update event") - await client.socket.listen(SOCKET_EVENTS.ORDERBOOK_UPDATE.value, callback) + # creates a MARKET order to be signed + signature_request = OrderSignatureRequest( + symbol=MARKET_SYMBOLS.ETH, + leverage=user_leverage, + price=1300, + quantity=0.5, + side=ORDER_SIDE.BUY, + orderType=ORDER_TYPE.LIMIT + ) - await place_limit_order(client) + # create signed order + signed_order = client.create_signed_order(signature_request) - timeout = 30 + print("Placing a market order") + # place signed order on orderbook + resp = await client.post_signed_order(signed_order) + print(resp) + ###### Closing socket connections after 30 seconds ##### + timeout = 10 end_time = time.time() + timeout while not event_received and time.time() < end_time: time.sleep(1) - status = await client.socket.unsubscribe_global_updates_by_symbol( - MARKET_SYMBOLS.ETH - ) - print("Unsubscribed from orderbook update events for ETH Market: {}".format(status)) - # close socket connection + # # close socket connection print("Closing sockets!") await client.socket.close() - await client.close_connections() - if __name__ == "__main__": + ### make sure keep the loop initialization same + # as below to ensure closing the script after receiving + # completion of each callback on socket events ### loop = asyncio.new_event_loop() - loop.run_until_complete(main()) + loop.create_task(main()) + pending = asyncio.all_tasks(loop=loop) + group = asyncio.gather(*pending) + loop.run_until_complete(group) loop.close() diff --git a/pyproject.toml b/pyproject.toml index 567e2d5..ca4af0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "bluefin_v2_client" -version = "2.6.0" +version = "2.7.0" description = "Library to interact with Bluefin exchange protocol including its off-chain api-gateway and on-chain contracts" readme = "README.md" requires-python = ">=3.8" diff --git a/src/bluefin_v2_client/sockets_lib.py b/src/bluefin_v2_client/sockets_lib.py index 3a4f2f0..f98a908 100644 --- a/src/bluefin_v2_client/sockets_lib.py +++ b/src/bluefin_v2_client/sockets_lib.py @@ -2,8 +2,7 @@ import time from .enumerations import MARKET_SYMBOLS, SOCKET_EVENTS import asyncio - -sio = socketio.Client() +sio = socketio.AsyncClient() class Sockets: @@ -16,37 +15,38 @@ def __init__(self, url, timeout=10, token=None) -> None: self.api_token = "" return - def _establish_connection(self): + async def _establish_connection(self): """ - Connects to the desired url + Connects to the desired url """ try: - sio.connect(self.url, wait_timeout=self.timeout, transports=["websocket"]) + await sio.connect(self.url, wait_timeout=self.timeout, + transports=["websocket"]) return True except: return False def set_token(self, token): """ - Sets default user token - Inputs: - - token (user auth token): Bluefin onboarding token. + Sets default user token + Inputs: + - token (user auth token): Bluefin onboarding token. """ self.token = token def set_api_token(self, token): """ - Sets default user token - Inputs: - - token (user auth token): Bluefin onboarding token. + Sets default user token + Inputs: + - token (user auth token): Bluefin onboarding token. """ self.api_token = token async def open(self): """ - opens socket instance connection + opens socket instance connection """ - self.connection_established = self._establish_connection() + self.connection_established = await self._establish_connection() if not self.connection_established: await self.close() raise (Exception("Failed to connect to Host: {}".format(self.url))) @@ -54,21 +54,21 @@ async def open(self): async def close(self): """ - closes the socket instance connection + closes the socket instance connection """ - sio.disconnect() + await sio.disconnect() return @sio.on("*") - def listener(event, data): + async def listener(event, data): """ - Listens to all events emitted by the server + Listens to all events emitted by the server """ try: if event in Sockets.callbacks.keys(): - Sockets.callbacks[event](data) + await Sockets.callbacks[event](data) elif "default" in Sockets.callbacks.keys(): - Sockets.callbacks["default"]({"event": event, "data": data}) + await Sockets.callbacks["default"]({"event": event, "data": data}) else: pass except: @@ -76,49 +76,39 @@ def listener(event, data): return @sio.event - def connect(): + async def connect(): print("Connected To Socket Server") - # add 10 seconds sleep to allow connection to be established before callbacks for connections are executed - if "connect" in Sockets.callbacks: + if 'connect' in Sockets.callbacks: # Execute the callback using asyncio.run() if available - time.sleep(10) - asyncio.run(Sockets.callbacks["connect"]()) + await Sockets.callbacks['connect']() @sio.event - def disconnect(): - print("Disconnected From Socket Server") - if "disconnect" in Sockets.callbacks: + async def disconnect(): + print('Disconnected From Socket Server') + if 'disconnect' in Sockets.callbacks: # Execute the callback using asyncio.run() if available - asyncio.run(Sockets.callbacks["disconnect"]()) + await Sockets.callbacks['disconnect']() async def listen(self, event, callback): """ - Assigns callbacks to desired events + Assigns callbacks to desired events """ Sockets.callbacks[event] = callback return async def subscribe_global_updates_by_symbol(self, symbol: MARKET_SYMBOLS): """ - Allows user to subscribe to global updates for the desired symbol. - Inputs: - - symbol: market symbol of market user wants global updates for. (e.g. DOT-PERP) + Allows user to subscribe to global updates for the desired symbol. + Inputs: + - symbol: market symbol of market user wants global updates for. (e.g. DOT-PERP) """ try: - if not self.connection_established: - raise Exception( - "Socket connection is established, invoke socket.open()" - ) - - resp = sio.call( - "SUBSCRIBE", - [ - { - "e": SOCKET_EVENTS.GLOBAL_UPDATES_ROOM.value, - "p": symbol.value, - }, - ], - ) + resp = await sio.call('SUBSCRIBE', [ + { + "e": SOCKET_EVENTS.GLOBAL_UPDATES_ROOM.value, + "p": symbol.value, + }, + ]) return resp["success"] except Exception as e: print("Error: ", e) @@ -126,84 +116,62 @@ async def subscribe_global_updates_by_symbol(self, symbol: MARKET_SYMBOLS): async def unsubscribe_global_updates_by_symbol(self, symbol: MARKET_SYMBOLS): """ - Allows user to unsubscribe to global updates for the desired symbol. - Inputs: - - symbol: market symbol of market user wants to remove global updates for. (e.g. DOT-PERP) + Allows user to unsubscribe to global updates for the desired symbol. + Inputs: + - symbol: market symbol of market user wants to remove global updates for. (e.g. DOT-PERP) """ try: - if not self.connection_established: - return False - - resp = sio.call( - "UNSUBSCRIBE", - [ - { - "e": SOCKET_EVENTS.GLOBAL_UPDATES_ROOM.value, - "p": symbol.value, - }, - ], - ) + resp = await sio.call('UNSUBSCRIBE', [ + { + "e": SOCKET_EVENTS.GLOBAL_UPDATES_ROOM.value, + "p": symbol.value, + }, + ]) return resp["success"] except Exception as e: print(e) return False - async def subscribe_user_update_by_token( - self, parent_account: str = None, user_token: str = None - ) -> bool: + async def subscribe_user_update_by_token(self, parent_account: str = None, user_token: str = None) -> bool: """ - Allows user to subscribe to their account updates. - Inputs: - - parent_account(str): address of parent account. Only whitelisted - sub-account can listen to its parent account position updates - - token(str): auth token generated when onboarding on Bluefin + Allows user to subscribe to their account updates. + Inputs: + - parent_account(str): address of parent account. Only whitelisted + sub-account can listen to its parent account position updates + - token(str): auth token generated when onboarding on Bluefin """ try: - if not self.connection_established: - return False - - resp = sio.call( - "SUBSCRIBE", - [ - { - "e": SOCKET_EVENTS.USER_UPDATES_ROOM.value, - "pa": parent_account, - "t": self.token if user_token == None else user_token, - "rt": self.api_token, - }, - ], - ) + resp = await sio.call("SUBSCRIBE", [ + { + "e": SOCKET_EVENTS.USER_UPDATES_ROOM.value, + 'pa': parent_account, + "t": self.token if user_token == None else user_token, + "rt": self.api_token + }, + ]) return resp["success"] except Exception as e: print(e) return False - async def unsubscribe_user_update_by_token( - self, parent_account: str = None, user_token: str = None - ): + async def unsubscribe_user_update_by_token(self, parent_account: str = None, user_token: str = None): """ - Allows user to unsubscribe to their account updates. - Inputs: - - parent_account(str): address of parent account. Only for sub-accounts - - token: auth token generated when onboarding on Bluefin + Allows user to unsubscribe to their account updates. + Inputs: + - parent_account(str): address of parent account. Only for sub-accounts + - token: auth token generated when onboarding on Bluefin """ try: - if not self.connection_established: - return False - - resp = sio.call( - "UNSUBSCRIBE", - [ - { - "e": SOCKET_EVENTS.USER_UPDATES_ROOM.value, - "pa": parent_account, - "t": self.token if user_token == None else user_token, - "rt": self.api_token, - }, - ], - ) + resp = await sio.call("UNSUBSCRIBE", [ + { + "e": SOCKET_EVENTS.USER_UPDATES_ROOM.value, + 'pa': parent_account, + "t": self.token if user_token == None else user_token, + "rt": self.api_token + }, + ]) return resp["success"] except: return False