Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 14 additions & 35 deletions pocketoptionapi_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _setup_event_handlers(self):
self._websocket.add_event_handler("disconnected", self._on_disconnected)

async def connect(
self, regions: Optional[List[str]] = None, persistent: bool = None
self, regions: Optional[List[str]] = None, persistent: Optional[bool] = None
) -> bool:
"""
Connect to PocketOption with multiple region support
Expand All @@ -162,7 +162,7 @@ async def connect(
logger.info("Connecting to PocketOption...")
# Update persistent setting if provided
if persistent is not None:
self.persistent_connection = persistent
self.persistent_connection = bool(persistent)

try:
if self.persistent_connection:
Expand Down Expand Up @@ -1181,39 +1181,31 @@ async def _on_candles_received(self, data: Dict[str, Any]) -> None:
logger.info(f"🕯️ Candles received with data: {type(data)}")
# Check if we have pending candle requests
if hasattr(self, "_candle_requests") and self._candle_requests:
# Parse the candles data
try:
# Get the first pending request to extract asset and timeframe info
for request_id, future in list(self._candle_requests.items()):
if not future.done():
# Extract asset and timeframe from request_id format: "asset_timeframe"
parts = request_id.split("_")
if len(parts) >= 2:
asset = "_".join(
parts[:-1]
) # Handle assets with underscores
asset = "_".join(parts[:-1])
timeframe = int(parts[-1])

candles = self._parse_candles_data(data, asset, timeframe)
candles = self._parse_candles_data(
data.get("candles", []), asset, timeframe
)
if self.enable_logging:
logger.info(
f"🕯️ Parsed {len(candles)} candles from response"
)

future.set_result(candles)
if self.enable_logging:
logger.debug(f"Resolved candle request: {request_id}")
break

except Exception as e:
if self.enable_logging:
logger.error(f"Error processing candles data: {e}")
# Resolve futures with empty result
for request_id, future in list(self._candle_requests.items()):
if not future.done():
future.set_result([])
break

await self._emit_event("candles_received", data)

async def _on_disconnected(self, data: Dict[str, Any]) -> None:
Expand All @@ -1227,79 +1219,66 @@ async def _handle_candles_stream(self, data: Dict[str, Any]) -> None:
try:
asset = data.get("asset")
period = data.get("period")

if not asset or not period:
return

request_id = f"{asset}_{period}"

if self.enable_logging:
logger.info(f"🕯️ Processing candle stream for {asset} ({period}s)")

# Check if we have a pending request for this asset/period
if (
hasattr(self, "_candle_requests")
and request_id in self._candle_requests
):
future = self._candle_requests[request_id]

if not future.done():
# Parse candles from stream data
candles = self._parse_stream_candles(data)
candles = self._parse_stream_candles(data, asset, period)
if candles:
future.set_result(candles)
if self.enable_logging:
logger.info(
f"🕯️ Resolved candle request for {asset} with {len(candles)} candles"
)

# Clean up the request
del self._candle_requests[request_id]

except Exception as e:
if self.enable_logging:
logger.error(f"Error handling candles stream: {e}")

def _parse_stream_candles(self, stream_data: Dict[str, Any]):
def _parse_stream_candles(
self, stream_data: Dict[str, Any], asset: str, timeframe: int
):
"""Parse candles from stream update data (changeSymbol response)"""
candles = []

try:
# Stream data might contain candles in different formats
candle_data = stream_data.get("data") or stream_data.get("candles") or []

if isinstance(candle_data, list):
for item in candle_data:
if isinstance(item, dict):
# Dict format
candle = Candle(
timestamp=datetime.fromtimestamp(item.get("time", 0)),
open=float(item.get("open", 0)),
high=float(item.get("high", 0)),
low=float(item.get("low", 0)),
close=float(item.get("close", 0)),
volume=float(item.get("volume", 0)),
asset=asset,
timeframe=timeframe,
)
candles.append(candle)
elif isinstance(item, (list, tuple)) and len(item) >= 6:
# Array format: [timestamp, open, close, high, low, volume]
candle = Candle(
timestamp=datetime.fromtimestamp(item[0]),
open=float(item[1]),
high=float(item[3]),
low=float(item[4]),
close=float(item[2]),
volume=float(item[5]) if len(item) > 5 else 0.0,
asset=asset,
timeframe=timeframe,
)
candles.append(candle)

# Sort by timestamp
candles.sort(key=lambda x: x.timestamp)

except Exception as e:
if self.enable_logging:
logger.error(f"Error parsing stream candles: {e}")

return candles

async def _on_keep_alive_connected(self):
Expand Down
34 changes: 28 additions & 6 deletions pocketoptionapi_async/connection_keep_alive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from typing import Optional, List, Callable, Dict, Any
from datetime import datetime, timedelta
from loguru import logger
import websockets
from websockets.exceptions import ConnectionClosed
from websockets.legacy.client import connect, WebSocketClientProtocol

from models import ConnectionInfo, ConnectionStatus
from constants import REGIONS
Expand All @@ -23,7 +23,7 @@ def __init__(self, ssid: str, is_demo: bool = True):
self.is_demo = is_demo

# Connection state
self.websocket: Optional[websockets.WebSocketServerProtocol] = None
self.websocket: Optional[WebSocketClientProtocol] = None
self.connection_info: Optional[ConnectionInfo] = None
self.is_connected = False
self.should_reconnect = True
Expand Down Expand Up @@ -138,7 +138,7 @@ async def _establish_connection(self) -> bool:

# Connect with headers (like old API)
self.websocket = await asyncio.wait_for(
websockets.connect(
connect(
url,
ssl=ssl_context,
extra_headers={
Expand Down Expand Up @@ -198,6 +198,8 @@ async def _establish_connection(self) -> bool:
async def _send_handshake(self):
"""Send initial handshake sequence (like old API)"""
try:
if not self.websocket:
raise RuntimeError("Handshake called with no websocket connection.")
# Wait for initial connection message
initial_message = await asyncio.wait_for(
self.websocket.recv(), timeout=10.0
Expand Down Expand Up @@ -408,9 +410,10 @@ async def _process_message(self, message):

# Handle ping-pong (like old API)
if message == "2":
await self.websocket.send("3")
self.connection_stats["last_pong_time"] = datetime.now()
logger.debug("Ping: Pong sent")
if self.websocket:
await self.websocket.send("3")
self.connection_stats["last_pong_time"] = datetime.now()
logger.debug("Ping: Pong sent")
return

# Handle authentication success (like old API)
Expand Down Expand Up @@ -490,6 +493,25 @@ def get_connection_stats(self) -> Dict[str, Any]:
"available_regions": len(self.available_urls),
}

async def connect_with_keep_alive(
self, regions: Optional[List[str]] = None
) -> bool:
"""Establish a persistent connection with keep-alive, optionally using a list of regions."""
# Optionally update available_urls if regions are provided
if regions:
# Assume regions are URLs or region names; adapt as needed
self.available_urls = regions
self.current_url_index = 0
return await self.start_persistent_connection()

async def disconnect(self) -> None:
"""Disconnect and clean up persistent connection."""
await self.stop_persistent_connection()

def get_stats(self) -> Dict[str, Any]:
"""Return connection statistics (alias for get_connection_stats)."""
return self.get_connection_stats()


async def demo_keep_alive():
"""Demo of the keep-alive connection manager"""
Expand Down
4 changes: 2 additions & 2 deletions pocketoptionapi_async/connection_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ def generate_diagnostics_report(self) -> Dict[str, Any]:

return report

def export_metrics_csv(self, filename: str = None) -> str:
def export_metrics_csv(self, filename: str = "") -> str:
"""Export metrics to CSV file"""
if not filename:
filename = f"metrics_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
Expand Down Expand Up @@ -723,7 +723,7 @@ async def _display_loop(self):
await asyncio.sleep(1)


async def run_monitoring_demo(ssid: str = None):
async def run_monitoring_demo(ssid: Optional[str] = None):
"""Run monitoring demonstration"""

if not ssid:
Expand Down
4 changes: 3 additions & 1 deletion pocketoptionapi_async/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ def get_all_regions(cls) -> Dict[str, str]:
"""Get all regions as a dictionary"""
return cls._REGIONS.copy()

from typing import Optional

Comment on lines +181 to +182
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Move the import to module level.

The Optional import should be placed at the top of the file with other imports, not inside the class definition. This is against Python conventions and could cause issues.

-from typing import Dict, List
+from typing import Dict, List, Optional

And remove the import from inside the class:

-    from typing import Optional
-
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from typing import Optional
# At the top of pocketoptionapi_async/constants.py
from typing import Dict, List, Optional
# … other imports …
class Constants:
# (Removed the inline `from typing import Optional`)
def get_region(self) -> Optional[str]:
# existing implementation…
...
🤖 Prompt for AI Agents
In pocketoptionapi_async/constants.py around lines 181 to 182, the import of
Optional is inside the class definition which violates Python conventions. Move
the import statement for Optional to the top of the file with the other imports
and remove the import from inside the class to follow proper module-level import
practices.

@classmethod
def get_region(cls, region_name: str) -> str:
def get_region(cls, region_name: str) -> Optional[str]:
"""Get specific region URL"""
return cls._REGIONS.get(region_name.upper())

Expand Down
4 changes: 3 additions & 1 deletion pocketoptionapi_async/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
class PocketOptionError(Exception):
"""Base exception for all PocketOption API errors"""

def __init__(self, message: str, error_code: str = None):
from typing import Optional

def __init__(self, message: str, error_code: Optional[str] = None):
Comment on lines +9 to +11
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Move import to module level.

The Optional import should be at the module level, not inside the class definition. This violates Python conventions and could cause import issues.

Apply this diff to fix the import placement:

+from typing import Optional
+
+
 class PocketOptionError(Exception):
     """Base exception for all PocketOption API errors"""
 
-    from typing import Optional
-
-    def __init__(self, message: str, error_code: Optional[str] = None):
+    def __init__(self, message: str, error_code: Optional[str] = None):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from typing import Optional
def __init__(self, message: str, error_code: Optional[str] = None):
from typing import Optional
class PocketOptionError(Exception):
"""Base exception for all PocketOption API errors"""
def __init__(self, message: str, error_code: Optional[str] = None):
# existing initialization logic...
super().__init__(message)
self.error_code = error_code
🤖 Prompt for AI Agents
In pocketoptionapi_async/exceptions.py around lines 9 to 11, the import of
Optional is currently inside the class or function scope. Move the import
statement for Optional to the top of the module, outside and above any class or
function definitions, to follow Python conventions and avoid import issues.

super().__init__(message)
self.message = message
self.error_code = error_code
Expand Down
22 changes: 15 additions & 7 deletions pocketoptionapi_async/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ class PerformanceMetrics:
class CircuitBreaker:
"""Circuit breaker pattern implementation"""

from typing import Type

def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception,
expected_exception: Type[BaseException] = Exception,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
Expand All @@ -79,7 +81,10 @@ def __init__(
async def call(self, func: Callable, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == "OPEN":
if time.time() - self.last_failure_time < self.recovery_timeout:
if (
self.last_failure_time is not None
and time.time() - self.last_failure_time < self.recovery_timeout
):
raise Exception("Circuit breaker is OPEN")
else:
self.state = "HALF_OPEN"
Expand Down Expand Up @@ -155,7 +160,10 @@ async def execute(self, func: Callable, *args, **kwargs):
)
await asyncio.sleep(delay)

raise last_exception
if last_exception is not None:
raise last_exception
else:
raise Exception("RetryPolicy failed but no exception was captured.")


class ErrorMonitor:
Expand Down Expand Up @@ -197,8 +205,8 @@ async def record_error(
severity: ErrorSeverity,
category: ErrorCategory,
message: str,
context: Dict[str, Any] = None,
stack_trace: str = None,
context: Optional[Dict[str, Any]] = None,
stack_trace: Optional[str] = None,
):
"""Record an error event"""
error_event = ErrorEvent(
Expand All @@ -208,7 +216,7 @@ async def record_error(
category=category,
message=message,
context=context or {},
stack_trace=stack_trace,
stack_trace=stack_trace or "",
)

self.errors.append(error_event)
Expand Down Expand Up @@ -340,7 +348,7 @@ async def execute_with_monitoring(
"args": str(args)[:200], # Truncate for security
"kwargs": str({k: str(v)[:100] for k, v in kwargs.items()})[:200],
},
stack_trace=None, # Could add traceback.format_exc() here
stack_trace="", # Could add traceback.format_exc() here
)

raise e
Expand Down
Loading