Skip to content
This repository was archived by the owner on Feb 21, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Services/Services_bases/bird_service/metadata.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "1.0.0",
"version": "1.2.0",
"origin_package": "OctoBot-Default-Tentacles",
"tentacles": ["BirdService"],
"tentacles-requirements": []
Expand Down
2 changes: 1 addition & 1 deletion Services/Services_bases/tavily_service/metadata.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "1.0.0",
"version": "1.2.0",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"origin_package": "OctoBot-Default-Tentacles",
"tentacles": ["TavilyService"],
"tentacles-requirements": []
Expand Down
193 changes: 91 additions & 102 deletions Trading/Exchange/polymarket/ccxt/polymarket_async.py

Large diffs are not rendered by default.

177 changes: 119 additions & 58 deletions Trading/Exchange/polymarket/ccxt/polymarket_pro.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from ccxt.base.types import Any, Int, Order, OrderBook, Str, Ticker, Trade
from ccxt.async_support.base.ws.client import Client
from typing import List
from ccxt.base.errors import ArgumentsRequired, ExchangeError
from ccxt.base.errors import ExchangeError
from ccxt.base.errors import ArgumentsRequired


class polymarket(polymarket):
Expand All @@ -26,6 +27,7 @@ def describe(self) -> Any:
'watchOrders': True,
'watchOrderBook': True,
'watchOHLCV': False,
'watchMarkets': True,
},
'urls': {
'api': {
Expand All @@ -37,8 +39,11 @@ def describe(self) -> Any:
},
},
'options': {
'wsMarketChannelType': 'market',
'wsUserChannelType': 'user',
'wsMarketChannelType': 'MARKET',
'wsUserChannelType': 'USER',
'headers': {'Origin': 'https://polymarket.com'},
'ping': True,
'keepalive': True,
},
'streaming': {
},
Expand Down Expand Up @@ -137,13 +142,34 @@ async def watch_ticker(self, symbol: str, params={}) -> Ticker:
request: dict = {
'type': self.options['wsMarketChannelType'],
'assets_ids': [assetId],
'custom_feature_enabled': False,
}
subscription: dict = {
'symbol': symbol,
'asset_id': assetId,
}
return await self.watch(url, messageHash, request, messageHash, subscription)

async def watch_markets(self, params={}) -> Any:
"""
watches for new and resolved markets
:param dict [params]: extra parameters specific to the exchange API endpoint
:param boolean [params.custom_feature_enabled]: enable custom features like market updates(default False)
:returns dict: A dictionary of `market structures <https://docs.ccxt.com/#/?id=market-structure>` indexed by market symbols
"""
await self.load_markets()
customFeatureEnabled = self.safe_bool(params, 'custom_feature_enabled', False)
url = self.urls['api']['ws']['market']
messageHash = 'markets'
request: dict = {
'type': self.options['wsMarketChannelType'],
'custom_feature_enabled': customFeatureEnabled,
}
subscription: dict = {
'custom_feature_enabled': customFeatureEnabled,
}
return await self.watch(url, messageHash, request, messageHash, subscription)

async def watch_orders(self, symbol: Str = None, since: Int = None, limit: Int = None, params={}) -> List[Order]:
"""
watches information on an order made by the user
Expand Down Expand Up @@ -289,7 +315,7 @@ def handle_order_book(self, client: Client, message):
orderbook['datetime'] = datetime
client.resolve(orderbook, messageHash)

def handle_price_change_orderbook(self, client: Client, message):
def handle_price_change_order_book(self, client: Client, message):
#
# Market websocket price_change event for orderbook updates:
# {
Expand Down Expand Up @@ -696,14 +722,24 @@ def handle_market_event(self, client: Client, message: Any, eventType: str):
self.handle_trades(client, message)
elif eventType == 'price_change':
# price_change updates both orderbook and ticker
self.handle_price_change_orderbook(client, message)
self.handle_price_change_order_book(client, message)
self.handle_ticker(client, message)
elif eventType == 'last_trade_price':
self.handle_ticker(client, message)
elif eventType == 'tick_size_change':
# Tick size change - can be used to update ticker
if self.verbose:
self.log('Tick size change event:', message)
elif eventType == 'new_market':
marketsSubscription = self.safe_value(client.subscriptions, 'markets')
customFeatureEnabled = self.safe_bool(marketsSubscription, 'custom_feature_enabled', False) if marketsSubscription else False
if customFeatureEnabled:
self.handle_new_market(client, message)
elif eventType == 'market_resolved':
marketsSubscription = self.safe_value(client.subscriptions, 'markets')
customFeatureEnabled = self.safe_bool(marketsSubscription, 'custom_feature_enabled', False) if marketsSubscription else False
if customFeatureEnabled:
self.handle_market_resolved(client, message)
else:
# Unknown event type, log but don't error
if self.verbose:
Expand All @@ -719,6 +755,36 @@ def handle_user_event(self, client: Client, message: Any, eventType: str):
if self.verbose:
self.log('Unknown user websocket event type:', eventType, message)

def handle_new_market(self, client: Client, message):
# Handle new market event
# Docs: https://docs.polymarket.com/developers/CLOB/websocket/market-channel#new_market-message
marketData = self.safe_dict(message, 'market', {})
marketId = self.safe_string(marketData, 'id')
if marketId:
# Update marketUpdates cache
if not self.marketUpdates:
self.marketUpdates = {}
self.marketUpdates[marketId] = {
'event_type': 'new_market',
'market': marketData,
}
client.resolve(self.marketUpdates, 'markets')

def handle_market_resolved(self, client: Client, message):
# Handle market resolved event
# Docs: https://docs.polymarket.com/developers/CLOB/websocket/market-channel#market_resolved-message
marketData = self.safe_dict(message, 'market', {})
marketId = self.safe_string(marketData, 'id')
if marketId:
# Update marketUpdates cache
if not self.marketUpdates:
self.marketUpdates = {}
self.marketUpdates[marketId] = {
'event_type': 'market_resolved',
'market': marketData,
}
client.resolve(self.marketUpdates, 'markets')

async def authenticate(self, params={}):
url = self.urls['api']['ws']['user']
client = self.client(url)
Expand Down Expand Up @@ -746,116 +812,111 @@ async def watch(self, url: str, messageHash: str, message=None, subscribeHash=No
client = self.client(url)
if subscribeHash is None:
subscribeHash = messageHash

# Handle market channel subscriptions with dynamic subscribe/unsubscribe (following onetrading pattern)
# Handle market channel subscriptions with dynamic subscribe/unsubscribe(following onetrading pattern)
if subscription is not None and url.find('/ws/market') >= 0:
# Use wsMarketChannelType for subscription hash
channelSubscriptionHash = self.options['wsMarketChannelType']

# Extract asset_id from message or subscription
assetId = None
assetId: str | None = None
if message and isinstance(message, dict):
assets_ids = self.safe_value(message, 'assets_ids', [])
if isinstance(assets_ids, list) and len(assets_ids) > 0:
assetId = assets_ids[0]
assetsIds = self.safe_value(message, 'assets_ids', [])
if isinstance(assetsIds, list) and len(assetsIds) > 0:
assetId = self.safe_string(assetsIds, 0)
if assetId is None:
assetId = self.safe_string_2(subscription, 'asset_id', 'token_id')

if assetId:
if assetId is not None:
# Get existing subscription or create new one
channelSubscription = self.safe_value(client.subscriptions, channelSubscriptionHash, {})
if not isinstance(channelSubscription, dict):
if not isinstance(channelSubscription, dict) or isinstance(channelSubscription, list):
channelSubscription = {}

# Check if we're already connected
is_connected = client.connected.done()

isConnected = client.connection and client.connection.readyState == 1 # WebSocket.OPEN = 1
# Check if asset_id needs to be added
needs_subscribe = assetId not in channelSubscription

if is_connected and needs_subscribe:
needsSubscribe = not (assetId in channelSubscription)
if isConnected and needsSubscribe:
# Connection exists and asset_id not subscribed - use dynamic subscribe
await self.subscribe_to_asset_ids([assetId])

# Add asset_id to subscription tracking
channelSubscription[assetId] = True
client.subscriptions[channelSubscriptionHash] = channelSubscription

# Build message with all subscribed asset_ids (following onetrading pattern)
all_asset_ids = list(channelSubscription.keys())
# Build message with all subscribed asset_ids(following onetrading pattern)
allAssetIds = list(channelSubscription.keys())
if message and isinstance(message, dict):
message['assets_ids'] = all_asset_ids
message['assets_ids'] = allAssetIds
message['type'] = self.options['wsMarketChannelType']

# Store individual subscription info for message routing
if not (subscribeHash in client.subscriptions):
client.subscriptions[subscribeHash] = subscription

return await super(polymarket, self).watch(url, messageHash, message, subscribeHash, subscription)
params = {}
params['headers'] = self.options['headers']
params['ping'] = self.options['ping']
params['keepalive'] = self.options['keepalive']
return await super(polymarket, self).watch(url, messageHash, message, subscribeHash, subscription, params)

async def subscribe_to_asset_ids(self, asset_ids: List[str], params={}):
"""
Dynamically subscribe to additional asset IDs on an existing market channel connection
:param List[str] asset_ids: list of asset IDs to subscribe to
:param str[] asset_ids: list of asset IDs to subscribe to
:param dict [params]: extra parameters
:returns: None
:returns Promise<void>:
"""
url = self.urls['api']['ws']['market']
client = self.client(url)
if not client.connected.done():
raise ExchangeError(self.id + ' subscribe_to_asset_ids() requires an active WebSocket connection')

if not (client.connection and client.connection.readyState == 1):
raise ExchangeError(self.id + ' subscribeToAssetIds() requires an active WebSocket connection')
channelSubscriptionHash = self.options['wsMarketChannelType']
channelSubscription = self.safe_value(client.subscriptions, channelSubscriptionHash, {})
if not isinstance(channelSubscription, dict):
if not isinstance(channelSubscription, dict) or isinstance(channelSubscription, list):
channelSubscription = {}

# Filter out already subscribed asset_ids
new_asset_ids = [aid for aid in asset_ids if aid not in channelSubscription]
if len(new_asset_ids) == 0:
newAssetIds = []
for i in range(0, len(asset_ids)):
aid = asset_ids[i]
if not (aid in channelSubscription):
newAssetIds.append(aid)
if len(newAssetIds) == 0:
return # All already subscribed

request: dict = {
'assets_ids': new_asset_ids,
'assets_ids': newAssetIds,
'operation': 'subscribe',
}
await client.send(request)

# Track newly subscribed asset_ids
for aid in new_asset_ids:
for i in range(0, len(newAssetIds)):
aid = newAssetIds[i]
channelSubscription[aid] = True
client.subscriptions[channelSubscriptionHash] = channelSubscription

async def unsubscribe_from_asset_ids(self, asset_ids: List[str], params={}):
"""
Dynamically unsubscribe from asset IDs on an existing market channel connection
:param List[str] asset_ids: list of asset IDs to unsubscribe from
:param str[] asset_ids: list of asset IDs to unsubscribe from
:param dict [params]: extra parameters
:returns: None
:returns Promise<void>:
"""
url = self.urls['api']['ws']['market']
client = self.client(url)
if not client.connected.done():
raise ExchangeError(self.id + ' unsubscribe_from_asset_ids() requires an active WebSocket connection')

if not (client.connection and client.connection.readyState == 1):
raise ExchangeError(self.id + ' unsubscribeFromAssetIds() requires an active WebSocket connection')
channelSubscriptionHash = self.options['wsMarketChannelType']
channelSubscription = self.safe_value(client.subscriptions, channelSubscriptionHash, {})
if not isinstance(channelSubscription, dict):
if not isinstance(channelSubscription, dict) or isinstance(channelSubscription, list):
channelSubscription = {}

# Filter to only unsubscribe from actually subscribed asset_ids
subscribed_asset_ids = [aid for aid in asset_ids if aid in channelSubscription]
if len(subscribed_asset_ids) == 0:
subscribedAssetIds = []
for i in range(0, len(asset_ids)):
aid = asset_ids[i]
if aid in channelSubscription:
subscribedAssetIds.append(aid)
if len(subscribedAssetIds) == 0:
return # None are subscribed

request: dict = {
'assets_ids': subscribed_asset_ids,
'operation': 'unsubscribe',
}
request = {}
request['assets_ids'] = subscribedAssetIds
request['operation'] = 'unsubscribe'
await client.send(request)

# Remove from tracking
for aid in subscribed_asset_ids:
for i in range(0, len(subscribedAssetIds)):
aid = subscribedAssetIds[i]
del channelSubscription[aid]
client.subscriptions[channelSubscriptionHash] = channelSubscription

Expand Down
Loading