From 8a0d62abdf7d21ac0fee76e5b013239cfc69013e Mon Sep 17 00:00:00 2001 From: Kumar G Date: Fri, 24 Oct 2025 19:25:17 +0530 Subject: [PATCH] working branch --- async_place_demo_order.py | 144 ++++ fix_imports.py | 78 ++ inspect_pocket.py | 47 + place_demo_order.py | 65 ++ .../connection_keep_alive.py | 7 +- pocketoptionapi_async/connection_monitor.py | 2 +- .../connection_monitor.py.bak | 815 ++++++++++++++++++ 7 files changed, 1155 insertions(+), 3 deletions(-) create mode 100644 async_place_demo_order.py create mode 100644 fix_imports.py create mode 100644 inspect_pocket.py create mode 100644 place_demo_order.py create mode 100644 pocketoptionapi_async/connection_monitor.py.bak diff --git a/async_place_demo_order.py b/async_place_demo_order.py new file mode 100644 index 0000000..08d002b --- /dev/null +++ b/async_place_demo_order.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +async_place_demo_order.py +Usage: + source .venv/bin/activate + python async_place_demo_order.py + +This script: +- Loads SSID from .env +- Connects with AsyncPocketOptionClient +- Attempts to place a small demo order using several common call signatures +- Prints responses and checks active orders +""" +import os +import asyncio +from dotenv import load_dotenv + +load_dotenv() + +SSID = os.getenv("SSID") +if not SSID: + raise SystemExit("SSID not found in .env. Add SSID=... (demo SSID, isDemo:1).") + +try: + from pocketoptionapi_async.client import AsyncPocketOptionClient +except Exception as e: + raise SystemExit("Cannot import AsyncPocketOptionClient: " + str(e)) + +# Configure order params (change asset/duration/amount to what your demo account supports) +AMOUNT = 1.0 +ASSET = "EURUSD" # update if needed (use an asset visible in demo UI) +DIRECTION = "call" # or "put" +DURATION = 60 # seconds or API-specific timeframe + +async def try_call(fn, *args, **kwargs): + try: + res = await fn(*args, **kwargs) + return True, res + except TypeError as te: + # signature mismatch + return False, te + except Exception as e: + return False, e + +async def main(): + client = AsyncPocketOptionClient(SSID) + + print("Connecting...") + try: + await client.connect() + except Exception as e: + print("Failed to connect:", e) + return + + print("Connected. Attempting to place order...") + + # candidate method names in priority order + candidate_methods = [ + "place_order", + "buy", + "create_order", + "trade", + "_send_order" # internal but present in some versions + ] + + tried = False + for method_name in candidate_methods: + if hasattr(client, method_name): + tried = True + method = getattr(client, method_name) + print(f"Trying method: {method_name} (positional)") + + ok, res = await try_call(method, AMOUNT, ASSET, DURATION, DIRECTION) + if ok: + print(f"Success (positional) with {method_name} ->", res) + order_result = res + break + + # try common keyword variations + kw_variants = [ + {"amount": AMOUNT, "asset": ASSET, "direction": DIRECTION, "duration": DURATION}, + {"amount": AMOUNT, "instrument": ASSET, "direction": DIRECTION, "duration": DURATION}, + {"amount": AMOUNT, "asset": ASSET, "type": "binary", "duration": DURATION, "direction": DIRECTION}, + {"value": AMOUNT, "asset": ASSET, "side": DIRECTION, "duration": DURATION}, + ] + for kw in kw_variants: + print(f"Trying {method_name} with kwargs: {list(kw.keys())}") + ok, res = await try_call(method, **kw) + if ok: + print(f"Success (kw) with {method_name} ->", res) + order_result = res + break + else: + print(f"{method_name} didn't accept attempted signatures; last error:", res) + order_result = None + + if order_result is not None: + break + + if not tried: + print("No candidate order methods found on client.") + else: + # If we got an order result, attempt to check order status / active orders + try: + if hasattr(client, "check_order_result"): + print("Calling check_order_result(...) to verify...") + try: + ok, info = await try_call(client.check_order_result, order_result) + print("check_order_result:", ok, info) + except Exception as e: + print("check_order_result call failed:", e) + + if hasattr(client, "get_active_orders"): + print("Fetching active orders...") + ok, active = await try_call(client.get_active_orders) + if ok: + print("Active orders:", active) + else: + print("get_active_orders error:", active) + except Exception as e: + print("Post-order checks failed:", e) + + # read balance if method exists + try: + if hasattr(client, "get_balance"): + bal = await client.get_balance() + print("Balance:", bal) + except Exception as e: + print("get_balance failed:", e) + + # disconnect + try: + await client.disconnect() + except Exception: + # some forks use close/disconnect variations + try: + await client.close() + except Exception: + pass + + print("Done.") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/fix_imports.py b/fix_imports.py new file mode 100644 index 0000000..9308799 --- /dev/null +++ b/fix_imports.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +""" +fix_imports.py +Usage: + python fix_imports.py /path/to/pocketoptionapi_async + +Patches bare internal imports (e.g. `from models import X` or `import client`) +to package-qualified imports `from pocketoptionapi_async.models import X` +or `from pocketoptionapi_async import client`. + +It will back up each file to filename.bak before editing. +""" +import sys, os, re, shutil + +if len(sys.argv) < 2: + print("Usage: python fix_imports.py /path/to/pocketoptionapi_async") + sys.exit(1) + +pkg_dir = sys.argv[1] +if not os.path.isdir(pkg_dir): + print("Directory not found:", pkg_dir) + sys.exit(2) + +# determine candidate module names in that folder (module.py or package dir) +candidates = set() +for name in os.listdir(pkg_dir): + if name.endswith(".py"): + candidates.add(name[:-3]) + elif os.path.isdir(os.path.join(pkg_dir, name)) and os.path.isfile(os.path.join(pkg_dir, name, "__init__.py")): + candidates.add(name) + +print("Found candidate internal modules:", sorted(candidates)) + +# regex helpers +def replace_in_text(text, mod): + # 1) from import ... + pattern1 = re.compile(rf'(^|\n)(\s*)from\s+{re.escape(mod)}\s+import\s+', flags=re.MULTILINE) + repl1 = rf'\1\2from pocketoptionapi_async.{mod} import ' + text = pattern1.sub(repl1, text) + + # 2) import [as ...] -> convert to `from pocketoptionapi_async import [as ...]` + # but avoid converting "import a, b" or "import pkg.mod" + pattern2 = re.compile(rf'(^|\n)(\s*)import\s+{re.escape(mod)}(\s|$|,| as)', flags=re.MULTILINE) + repl2 = rf'\1\2from pocketoptionapi_async import {mod}\3' + text = pattern2.sub(repl2, text) + + return text + +# walk and patch +patched_files = [] +for root, _, files in os.walk(pkg_dir): + for fname in files: + if not fname.endswith(".py"): + continue + path = os.path.join(root, fname) + with open(path, "r", encoding="utf-8") as f: + text = f.read() + new_text = text + for mod in sorted(candidates, key=lambda x: -len(x)): # longer first + # avoid changing the very file which defines the module if it imports itself + # but it's safe to replace in general + new_text = replace_in_text(new_text, mod) + + if new_text != text: + bak = path + ".bak" + print("Patching", path, "-> backup saved to", bak) + shutil.copy2(path, bak) + with open(path, "w", encoding="utf-8") as f: + f.write(new_text) + patched_files.append(path) + +if not patched_files: + print("No files required patching.") +else: + print("Patched files count:", len(patched_files)) + for p in patched_files: + print(" -", p) +print("Done.") diff --git a/inspect_pocket.py b/inspect_pocket.py new file mode 100644 index 0000000..a0aab02 --- /dev/null +++ b/inspect_pocket.py @@ -0,0 +1,47 @@ +# inspect_pocket.py +import importlib, pkgutil, inspect, os +import sys + +# make sure repo root is importable +root = os.path.abspath(os.path.dirname(__file__)) +if root not in sys.path: + sys.path.insert(0, root) + +PKG = "pocketoptionapi_async" + +try: + pkg = importlib.import_module(PKG) +except Exception as e: + print("Failed to import package", PKG, ":", e) + sys.exit(1) + +print("Package imported:", pkg) +print("\nTop-level modules in package:") +for m in pkgutil.iter_modules(pkg.__path__): + print(" -", m.name) + +candidates = [] +print("\nScanning modules for classes and order-like methods (this may take a sec)...\n") +for finder, name, ispkg in pkgutil.walk_packages(pkg.__path__, pkg.__name__ + "."): + try: + mod = importlib.import_module(name) + except Exception as e: + print(" (skip) failed to import", name, ":", e) + continue + for cls_name, cls in inspect.getmembers(mod, inspect.isclass): + # only show classes defined in this package + if cls.__module__.startswith(PKG): + methods = [m for m, _ in inspect.getmembers(cls, inspect.isfunction)] + # look for order-like methods + order_like = [m for m in methods if any(x in m.lower() for x in ("buy", "place", "order", "trade", "create"))] + if order_like: + print(f"Class: {cls.__module__}.{cls_name}") + print(" order-like methods:", order_like) + candidates.append((cls.__module__, cls_name, order_like)) + else: + # also show client-ish classes + if any(x in cls_name.lower() for x in ("client","pocket","async")): + print(f"Class: {cls.__module__}.{cls_name}") + print(" methods sample:", methods[:8]) + # small separator +print("\nDone. If nothing useful shows up paste the output here and I’ll produce the exact script.") diff --git a/place_demo_order.py b/place_demo_order.py new file mode 100644 index 0000000..bcec391 --- /dev/null +++ b/place_demo_order.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +import os +from dotenv import load_dotenv +load_dotenv() + +# Try common import; adapt if your repo uses different module names +try: + # common sync API wrapper + from pocketoptionapi_async.stable_api import PocketOption +except Exception: + # fallback to client (some forks expose sync client differently) + try: + from pocketoptionapi_async.client import PocketOption as PocketOption + except Exception: + raise SystemExit("Cannot import PocketOption class. Run grep as explained in README to find the class name.") + +def main(): + ssid = os.getenv("SSID") + if not ssid: + raise SystemExit("SSID not set. Put it in .env or export SSID in shell.") + + api = PocketOption(ssid) # instantiate client + ok, msg = api.connect() # many forks return (ok, msg) from connect + if not ok: + print("Connect failed:", msg) + return + + # --- change these to an asset available in demo account --- + amount = 1.0 + asset = "EURUSD" # example — use an asset listed by the demo account + direction = "call" # or "put" + duration = 60 # seconds or API-specific timeframe + + # Try common method names in order of likelihood + for method in ("buy", "place_order", "create_order", "trade"): + if hasattr(api, method): + fn = getattr(api, method) + try: + print(f"Using method: {method}") + res = fn(amount, asset, duration, direction) # many libs use this signature + except TypeError: + # try keyword style + try: + res = fn(amount=amount, asset=asset, direction=direction, duration=duration) + except Exception as e: + print("Method exists but calling failed:", e) + res = None + print("Result:", res) + break + else: + print("No typical order method found on api object. See instructions to grep for available methods.") + + try: + # read balance or open positions if available + if hasattr(api, "get_balance"): + print("Balance:", api.get_balance()) + elif hasattr(api, "balance"): + print("Balance property:", api.balance) + except Exception: + pass + + api.close() + +if __name__ == "__main__": + main() diff --git a/pocketoptionapi_async/connection_keep_alive.py b/pocketoptionapi_async/connection_keep_alive.py index 23d4b00..19b01ec 100644 --- a/pocketoptionapi_async/connection_keep_alive.py +++ b/pocketoptionapi_async/connection_keep_alive.py @@ -9,8 +9,11 @@ from websockets.exceptions import ConnectionClosed from websockets.legacy.client import connect, WebSocketClientProtocol -from models import ConnectionInfo, ConnectionStatus -from constants import REGIONS +#from models import ConnectionInfo, ConnectionStatus +from pocketoptionapi_async.models import ConnectionInfo, ConnectionStatus +from pocketoptionapi_async.constants import REGIONS + +#from constants import REGIONS class ConnectionKeepAlive: diff --git a/pocketoptionapi_async/connection_monitor.py b/pocketoptionapi_async/connection_monitor.py index 1aaa64c..47eb9dc 100644 --- a/pocketoptionapi_async/connection_monitor.py +++ b/pocketoptionapi_async/connection_monitor.py @@ -13,7 +13,7 @@ import statistics from loguru import logger -from client import AsyncPocketOptionClient +from pocketoptionapi_async.client import AsyncPocketOptionClient @dataclass diff --git a/pocketoptionapi_async/connection_monitor.py.bak b/pocketoptionapi_async/connection_monitor.py.bak new file mode 100644 index 0000000..1aaa64c --- /dev/null +++ b/pocketoptionapi_async/connection_monitor.py.bak @@ -0,0 +1,815 @@ +""" +Advanced Connection Monitor and Diagnostics Tool +Real-time monitoring, diagnostics, and performance analysis +""" + +import asyncio +import time +import json +from datetime import datetime, timedelta +from typing import Dict, List, Any, Optional, Callable +from dataclasses import dataclass, asdict +from collections import deque, defaultdict +import statistics +from loguru import logger + +from client import AsyncPocketOptionClient + + +@dataclass +class ConnectionMetrics: + """Connection performance metrics""" + + timestamp: datetime + connection_time: float + ping_time: Optional[float] + message_count: int + error_count: int + region: str + status: str + + +@dataclass +class PerformanceSnapshot: + """Performance snapshot""" + + timestamp: datetime + memory_usage_mb: float + cpu_percent: float + active_connections: int + messages_per_second: float + error_rate: float + avg_response_time: float + + +class ConnectionMonitor: + """Advanced connection monitoring and diagnostics""" + + def __init__(self, ssid: str, is_demo: bool = True): + self.ssid = ssid + self.is_demo = is_demo + + # Monitoring state + self.is_monitoring = False + self.monitor_task: Optional[asyncio.Task] = None + self.client: Optional[AsyncPocketOptionClient] = None + + # Metrics storage + self.connection_metrics: deque = deque(maxlen=1000) + self.performance_snapshots: deque = deque(maxlen=500) + self.error_log: deque = deque(maxlen=200) + self.message_stats: Dict[str, int] = defaultdict(int) + + # Real-time stats + self.start_time = datetime.now() + self.total_messages = 0 + self.total_errors = 0 + self.last_ping_time = None + self.ping_times: deque = deque(maxlen=100) + + # Event handlers + self.event_handlers: Dict[str, List[Callable]] = defaultdict(list) + + # Performance tracking + self.response_times: deque = deque(maxlen=100) + self.connection_attempts = 0 + self.successful_connections = 0 + + async def start_monitoring(self, persistent_connection: bool = True) -> bool: + """Start real-time monitoring""" + logger.info("Analysis: Starting connection monitoring...") + + try: + # Initialize client + self.client = AsyncPocketOptionClient( + self.ssid, + is_demo=self.is_demo, + persistent_connection=persistent_connection, + auto_reconnect=True, + ) + + # Setup event handlers + self._setup_event_handlers() + + # Connect + self.connection_attempts += 1 + start_time = time.time() + + success = await self.client.connect() + + if success: + connection_time = time.time() - start_time + self.successful_connections += 1 + + # Record connection metrics + self._record_connection_metrics(connection_time, "CONNECTED") + + # Start monitoring tasks + self.is_monitoring = True + self.monitor_task = asyncio.create_task(self._monitoring_loop()) + + logger.success( + f"Success: Monitoring started (connection time: {connection_time:.3f}s)" + ) + return True + else: + self._record_connection_metrics(0, "FAILED") + logger.error("Error: Failed to connect for monitoring") + return False + + except Exception as e: + self.total_errors += 1 + self._record_error("monitoring_start", str(e)) + logger.error(f"Error: Failed to start monitoring: {e}") + return False + + async def stop_monitoring(self): + """Stop monitoring""" + logger.info("Stopping connection monitoring...") + + self.is_monitoring = False + + if self.monitor_task and not self.monitor_task.done(): + self.monitor_task.cancel() + try: + await self.monitor_task + except asyncio.CancelledError: + pass + + if self.client: + await self.client.disconnect() + + logger.info("Success: Monitoring stopped") + + def _setup_event_handlers(self): + """Setup event handlers for monitoring""" + if not self.client: + return + + # Connection events + self.client.add_event_callback("connected", self._on_connected) + self.client.add_event_callback("disconnected", self._on_disconnected) + self.client.add_event_callback("reconnected", self._on_reconnected) + self.client.add_event_callback("auth_error", self._on_auth_error) + + # Data events + self.client.add_event_callback("balance_updated", self._on_balance_updated) + self.client.add_event_callback("candles_received", self._on_candles_received) + self.client.add_event_callback("message_received", self._on_message_received) + + async def _monitoring_loop(self): + """Main monitoring loop""" + logger.info("Persistent: Starting monitoring loop...") + + while self.is_monitoring: + try: + # Collect performance snapshot + await self._collect_performance_snapshot() + + # Check connection health + await self._check_connection_health() + + # Send ping and measure response + await self._measure_ping_response() + + # Emit monitoring events + await self._emit_monitoring_events() + + await asyncio.sleep(5) # Monitor every 5 seconds + + except Exception as e: + self.total_errors += 1 + self._record_error("monitoring_loop", str(e)) + logger.error(f"Error: Monitoring loop error: {e}") + + async def _collect_performance_snapshot(self): + """Collect performance metrics snapshot""" + try: + # Try to get system metrics + memory_mb = 0 + cpu_percent = 0 + + try: + import psutil + import os + + process = psutil.Process(os.getpid()) + memory_mb = process.memory_info().rss / 1024 / 1024 + cpu_percent = process.cpu_percent() + except ImportError: + pass + + # Calculate messages per second + uptime = (datetime.now() - self.start_time).total_seconds() + messages_per_second = self.total_messages / uptime if uptime > 0 else 0 + + # Calculate error rate + error_rate = self.total_errors / max(self.total_messages, 1) + + # Calculate average response time + avg_response_time = ( + statistics.mean(self.response_times) if self.response_times else 0 + ) + + snapshot = PerformanceSnapshot( + timestamp=datetime.now(), + memory_usage_mb=memory_mb, + cpu_percent=cpu_percent, + active_connections=1 if self.client and self.client.is_connected else 0, + messages_per_second=messages_per_second, + error_rate=error_rate, + avg_response_time=avg_response_time, + ) + + self.performance_snapshots.append(snapshot) + + except Exception as e: + logger.error(f"Error: Error collecting performance snapshot: {e}") + + async def _check_connection_health(self): + """Check connection health status""" + if not self.client: + return + + try: + # Check if still connected + if not self.client.is_connected: + self._record_connection_metrics(0, "DISCONNECTED") + return + + # Try to get balance as health check + start_time = time.time() + balance = await self.client.get_balance() + response_time = time.time() - start_time + + self.response_times.append(response_time) + + if balance: + self._record_connection_metrics(response_time, "HEALTHY") + else: + self._record_connection_metrics(response_time, "UNHEALTHY") + + except Exception as e: + self.total_errors += 1 + self._record_error("health_check", str(e)) + self._record_connection_metrics(0, "ERROR") + + async def _measure_ping_response(self): + """Measure ping response time""" + if not self.client or not self.client.is_connected: + return + + try: + start_time = time.time() + await self.client.send_message('42["ps"]') + + # Note: We can't easily measure the actual ping response time + # since it's handled internally. This measures send time. + ping_time = time.time() - start_time + + self.ping_times.append(ping_time) + self.last_ping_time = datetime.now() + + self.total_messages += 1 + self.message_stats["ping"] += 1 + + except Exception as e: + self.total_errors += 1 + self._record_error("ping_measure", str(e)) + + async def _emit_monitoring_events(self): + """Emit monitoring events""" + try: + # Emit real-time stats + stats = self.get_real_time_stats() + await self._emit_event("stats_update", stats) + + # Emit alerts if needed + await self._check_and_emit_alerts(stats) + + except Exception as e: + logger.error(f"Error: Error emitting monitoring events: {e}") + + async def _check_and_emit_alerts(self, stats: Dict[str, Any]): + """Check for alert conditions and emit alerts""" + + # High error rate alert + if stats["error_rate"] > 0.1: # 10% error rate + await self._emit_event( + "alert", + { + "type": "high_error_rate", + "value": stats["error_rate"], + "threshold": 0.1, + "message": f"High error rate detected: {stats['error_rate']:.1%}", + }, + ) + + # Slow response time alert + if stats["avg_response_time"] > 5.0: # 5 seconds + await self._emit_event( + "alert", + { + "type": "slow_response", + "value": stats["avg_response_time"], + "threshold": 5.0, + "message": f"Slow response time: {stats['avg_response_time']:.2f}s", + }, + ) + + # Connection issues alert + if not stats["is_connected"]: + await self._emit_event( + "alert", {"type": "connection_lost", "message": "Connection lost"} + ) + + # Memory usage alert (if available) + if "memory_usage_mb" in stats and stats["memory_usage_mb"] > 500: # 500MB + await self._emit_event( + "alert", + { + "type": "high_memory", + "value": stats["memory_usage_mb"], + "threshold": 500, + "message": f"High memory usage: {stats['memory_usage_mb']:.1f}MB", + }, + ) + + def _record_connection_metrics(self, connection_time: float, status: str): + """Record connection metrics""" + region = "UNKNOWN" + if self.client and self.client.connection_info: + region = self.client.connection_info.region or "UNKNOWN" + + metrics = ConnectionMetrics( + timestamp=datetime.now(), + connection_time=connection_time, + ping_time=self.ping_times[-1] if self.ping_times else None, + message_count=self.total_messages, + error_count=self.total_errors, + region=region, + status=status, + ) + + self.connection_metrics.append(metrics) + + def _record_error(self, error_type: str, error_message: str): + """Record error for analysis""" + error_record = { + "timestamp": datetime.now(), + "type": error_type, + "message": error_message, + } + self.error_log.append(error_record) + + async def _emit_event(self, event_type: str, data: Any): + """Emit event to registered handlers""" + if event_type in self.event_handlers: + for handler in self.event_handlers[event_type]: + try: + if asyncio.iscoroutinefunction(handler): + await handler(data) + else: + handler(data) + except Exception as e: + logger.error(f"Error: Error in event handler for {event_type}: {e}") + + # Event handler methods + async def _on_connected(self, data): + self.total_messages += 1 + self.message_stats["connected"] += 1 + logger.info("Connection established") + + async def _on_disconnected(self, data): + self.total_messages += 1 + self.message_stats["disconnected"] += 1 + logger.warning("Connection lost") + + async def _on_reconnected(self, data): + self.total_messages += 1 + self.message_stats["reconnected"] += 1 + logger.info("Connection restored") + + async def _on_auth_error(self, data): + self.total_errors += 1 + self.message_stats["auth_error"] += 1 + self._record_error("auth_error", str(data)) + logger.error("Authentication error") + + async def _on_balance_updated(self, data): + self.total_messages += 1 + self.message_stats["balance"] += 1 + + async def _on_candles_received(self, data): + self.total_messages += 1 + self.message_stats["candles"] += 1 + + async def _on_message_received(self, data): + self.total_messages += 1 + self.message_stats["message"] += 1 + + def add_event_handler(self, event_type: str, handler: Callable): + """Add event handler for monitoring events""" + self.event_handlers[event_type].append(handler) + + def get_real_time_stats(self) -> Dict[str, Any]: + """Get current real-time statistics""" + uptime = datetime.now() - self.start_time + + stats = { + "uptime": uptime.total_seconds(), + "uptime_str": str(uptime).split(".")[0], + "total_messages": self.total_messages, + "total_errors": self.total_errors, + "error_rate": self.total_errors / max(self.total_messages, 1), + "messages_per_second": self.total_messages / uptime.total_seconds() + if uptime.total_seconds() > 0 + else 0, + "connection_attempts": self.connection_attempts, + "successful_connections": self.successful_connections, + "connection_success_rate": self.successful_connections + / max(self.connection_attempts, 1), + "is_connected": self.client.is_connected if self.client else False, + "last_ping_time": self.last_ping_time.isoformat() + if self.last_ping_time + else None, + "message_types": dict(self.message_stats), + } + + # Add response time stats + if self.response_times: + stats.update( + { + "avg_response_time": statistics.mean(self.response_times), + "min_response_time": min(self.response_times), + "max_response_time": max(self.response_times), + "median_response_time": statistics.median(self.response_times), + } + ) + + # Add ping stats + if self.ping_times: + stats.update( + { + "avg_ping_time": statistics.mean(self.ping_times), + "min_ping_time": min(self.ping_times), + "max_ping_time": max(self.ping_times), + } + ) + + # Add latest performance snapshot data + if self.performance_snapshots: + latest = self.performance_snapshots[-1] + stats.update( + { + "memory_usage_mb": latest.memory_usage_mb, + "cpu_percent": latest.cpu_percent, + } + ) + + return stats + + def get_historical_metrics(self, hours: int = 1) -> Dict[str, Any]: + """Get historical metrics for the specified time period""" + cutoff_time = datetime.now() - timedelta(hours=hours) + + # Filter metrics + recent_metrics = [ + m for m in self.connection_metrics if m.timestamp > cutoff_time + ] + recent_snapshots = [ + s for s in self.performance_snapshots if s.timestamp > cutoff_time + ] + recent_errors = [e for e in self.error_log if e["timestamp"] > cutoff_time] + + historical = { + "time_period_hours": hours, + "connection_metrics_count": len(recent_metrics), + "performance_snapshots_count": len(recent_snapshots), + "error_count": len(recent_errors), + "metrics": [asdict(m) for m in recent_metrics], + "snapshots": [asdict(s) for s in recent_snapshots], + "errors": recent_errors, + } + + # Calculate trends + if recent_snapshots: + memory_values = [ + s.memory_usage_mb for s in recent_snapshots if s.memory_usage_mb > 0 + ] + response_values = [ + s.avg_response_time for s in recent_snapshots if s.avg_response_time > 0 + ] + + if memory_values: + historical["memory_trend"] = { + "avg": statistics.mean(memory_values), + "min": min(memory_values), + "max": max(memory_values), + "trend": "increasing" + if len(memory_values) > 1 and memory_values[-1] > memory_values[0] + else "stable", + } + + if response_values: + historical["response_time_trend"] = { + "avg": statistics.mean(response_values), + "min": min(response_values), + "max": max(response_values), + "trend": "improving" + if len(response_values) > 1 + and response_values[-1] < response_values[0] + else "stable", + } + + return historical + + def generate_diagnostics_report(self) -> Dict[str, Any]: + """Generate comprehensive diagnostics report""" + stats = self.get_real_time_stats() + historical = self.get_historical_metrics(hours=2) + + # Health assessment + health_score = 100 + health_issues = [] + + if stats["error_rate"] > 0.05: + health_score -= 20 + health_issues.append(f"High error rate: {stats['error_rate']:.1%}") + + if not stats["is_connected"]: + health_score -= 30 + health_issues.append("Not connected") + + if stats.get("avg_response_time", 0) > 3.0: + health_score -= 15 + health_issues.append( + f"Slow response time: {stats.get('avg_response_time', 0):.2f}s" + ) + + if stats["connection_success_rate"] < 0.9: + health_score -= 10 + health_issues.append( + f"Low connection success rate: {stats['connection_success_rate']:.1%}" + ) + + health_score = max(0, health_score) + + # Recommendations + recommendations = [] + + if stats["error_rate"] > 0.1: + recommendations.append( + "High error rate detected. Check network connectivity and SSID validity." + ) + + if stats.get("avg_response_time", 0) > 5.0: + recommendations.append( + "Slow response times. Consider using persistent connections or different region." + ) + + if stats.get("memory_usage_mb", 0) > 300: + recommendations.append( + "High memory usage detected. Monitor for memory leaks." + ) + + if not recommendations: + recommendations.append("System is operating normally.") + + report = { + "timestamp": datetime.now().isoformat(), + "health_score": health_score, + "health_status": "EXCELLENT" + if health_score > 90 + else "GOOD" + if health_score > 70 + else "FAIR" + if health_score > 50 + else "POOR", + "health_issues": health_issues, + "recommendations": recommendations, + "real_time_stats": stats, + "historical_metrics": historical, + "connection_summary": { + "total_attempts": stats["connection_attempts"], + "successful_connections": stats["successful_connections"], + "current_status": "CONNECTED" + if stats["is_connected"] + else "DISCONNECTED", + "uptime": stats["uptime_str"], + }, + } + + return report + + def export_metrics_csv(self, filename: str = "") -> str: + """Export metrics to CSV file""" + if not filename: + filename = f"metrics_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" + + try: + import pandas as pd + + # Convert metrics to DataFrame + metrics_data = [] + for metric in self.connection_metrics: + metrics_data.append(asdict(metric)) + + if metrics_data: + df = pd.DataFrame(metrics_data) + df.to_csv(filename, index=False) + logger.info(f"Statistics: Metrics exported to {filename}") + else: + logger.warning("No metrics data to export") + + return filename + + except ImportError: + logger.error("pandas not available for CSV export") + + # Fallback: basic CSV export + import csv + + with open(filename, "w", newline="") as csvfile: + if self.connection_metrics: + fieldnames = asdict(self.connection_metrics[0]).keys() + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for metric in self.connection_metrics: + writer.writerow(asdict(metric)) + + return filename + + +class RealTimeDisplay: + """Real-time console display for monitoring""" + + def __init__(self, monitor: ConnectionMonitor): + self.monitor = monitor + self.display_task: Optional[asyncio.Task] = None + self.is_displaying = False + + async def start_display(self): + """Start real-time display""" + self.is_displaying = True + self.display_task = asyncio.create_task(self._display_loop()) + + async def stop_display(self): + """Stop real-time display""" + self.is_displaying = False + if self.display_task and not self.display_task.done(): + self.display_task.cancel() + try: + await self.display_task + except asyncio.CancelledError: + pass + + async def _display_loop(self): + """Display loop""" + while self.is_displaying: + try: + # Clear screen (ANSI escape sequence) + print("\033[2J\033[H", end="") + + # Display header + print("Analysis: PocketOption API Connection Monitor") + print("=" * 60) + + # Get stats + stats = self.monitor.get_real_time_stats() + + # Display connection status + status = "Connected" if stats["is_connected"] else "Disconnected" + print(f"Status: {status}") + print(f"Uptime: {stats['uptime_str']}") + print() + + # Display metrics + print("Statistics: Metrics:") + print(f" Messages: {stats['total_messages']}") + print(f" Errors: {stats['total_errors']}") + print(f" Error Rate: {stats['error_rate']:.1%}") + print(f" Messages/sec: {stats['messages_per_second']:.2f}") + print() + + # Display performance + if "avg_response_time" in stats: + print("Performance:") + print(f" Avg Response: {stats['avg_response_time']:.3f}s") + print(f" Min Response: {stats['min_response_time']:.3f}s") + print(f" Max Response: {stats['max_response_time']:.3f}s") + print() + + # Display memory if available + if "memory_usage_mb" in stats: + print("Resources:") + print(f" Memory: {stats['memory_usage_mb']:.1f} MB") + print(f" CPU: {stats['cpu_percent']:.1f}%") + print() + + # Display message types + if stats["message_types"]: + print("Message: Message Types:") + for msg_type, count in stats["message_types"].items(): + print(f" {msg_type}: {count}") + print() + + print("Press Ctrl+C to stop monitoring...") + + await asyncio.sleep(2) # Update every 2 seconds + + except Exception as e: + logger.error(f"Display error: {e}") + await asyncio.sleep(1) + + +async def run_monitoring_demo(ssid: Optional[str] = None): + """Run monitoring demonstration""" + + if not ssid: + ssid = r'42["auth",{"session":"demo_session_for_monitoring","isDemo":1,"uid":0,"platform":1}]' + logger.warning("Caution: Using demo SSID for monitoring") + + logger.info("Analysis: Starting Advanced Connection Monitor Demo") + + # Create monitor + monitor = ConnectionMonitor(ssid, is_demo=True) + + # Add event handlers for alerts + async def on_alert(alert_data): + logger.warning(f"Alert: ALERT: {alert_data['message']}") + + async def on_stats_update(stats): + # Could send to external monitoring system + pass + + monitor.add_event_handler("alert", on_alert) + monitor.add_event_handler("stats_update", on_stats_update) + + # Create real-time display + display = RealTimeDisplay(monitor) + + try: + # Start monitoring + success = await monitor.start_monitoring(persistent_connection=True) + + if success: + # Start real-time display + await display.start_display() + + # Let it run for a while + await asyncio.sleep(120) # Run for 2 minutes + + else: + logger.error("Error: Failed to start monitoring") + + except KeyboardInterrupt: + logger.info("Stopping: Monitoring stopped by user") + + finally: + # Stop display and monitoring + await display.stop_display() + await monitor.stop_monitoring() + + # Generate final report + report = monitor.generate_diagnostics_report() + + logger.info("\nCompleted: FINAL DIAGNOSTICS REPORT") + logger.info("=" * 50) + logger.info( + f"Health Score: {report['health_score']}/100 ({report['health_status']})" + ) + + if report["health_issues"]: + logger.warning("Issues found:") + for issue in report["health_issues"]: + logger.warning(f" - {issue}") + + logger.info("Recommendations:") + for rec in report["recommendations"]: + logger.info(f" - {rec}") + + # Save detailed report + report_file = ( + f"monitoring_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + ) + with open(report_file, "w") as f: + json.dump(report, f, indent=2, default=str) + + logger.info(f"Report: Detailed report saved to: {report_file}") + + # Export metrics + metrics_file = monitor.export_metrics_csv() + logger.info(f"Statistics: Metrics exported to: {metrics_file}") + + +if __name__ == "__main__": + import sys + + # Allow passing SSID as command line argument + ssid = None + if len(sys.argv) > 1: + ssid = sys.argv[1] + logger.info(f"Using provided SSID: {ssid[:50]}...") + + asyncio.run(run_monitoring_demo(ssid))