diff --git a/DEVELOPMENT_PLAN.md b/DEVELOPMENT_PLAN.md index 59e08b8..ea8c7dd 100644 --- a/DEVELOPMENT_PLAN.md +++ b/DEVELOPMENT_PLAN.md @@ -208,19 +208,21 @@ main - Concurrent execution where applicable - Memory-efficient DataFrame operations -#### 3.2 Feed Management System ⬜ +#### 3.2 Feed Management System ✅ **Branch**: `feature/feed-management` **Priority**: 🟢 Medium **Estimated Time**: 2 days +**Actual Time**: < 1 day +**Completed**: 2025-01-16 **Tasks**: -- [ ] Create `http/feed_manager.py` module -- [ ] Implement subscription level detection -- [ ] Add automatic feed fallback (SIP → IEX) -- [ ] Add feed validation per endpoint -- [ ] Create `FeedConfig` dataclass -- [ ] Add comprehensive tests (8+ test cases) -- [ ] Update documentation +- [x] Create `http/feed_manager.py` module +- [x] Implement subscription level detection +- [x] Add automatic feed fallback (SIP → IEX) +- [x] Add feed validation per endpoint +- [x] Create `FeedConfig` dataclass +- [x] Add comprehensive tests (47 test cases: 36 unit, 11 integration) +- [x] Update documentation **Acceptance Criteria**: - Auto-detects user's subscription level @@ -296,13 +298,13 @@ main ## 📈 Progress Tracking -### Overall Progress: 🟦 60% Complete +### Overall Progress: 🟦 67% Complete | Phase | Status | Progress | Estimated Completion | |-------|--------|----------|---------------------| | Phase 1: Critical Features | ✅ Complete | 100% | Week 1 | | Phase 2: Important Enhancements | ✅ Complete | 100% | Week 2 | -| Phase 3: Performance & Quality | 🟦 In Progress | 33% | Week 7 | +| Phase 3: Performance & Quality | 🟦 In Progress | 67% | Week 7 | | Phase 4: Advanced Features | ⬜ Not Started | 0% | Week 10 | ### Feature Status Legend diff --git a/src/py_alpaca_api/http/feed_manager.py b/src/py_alpaca_api/http/feed_manager.py new file mode 100644 index 0000000..710acd7 --- /dev/null +++ b/src/py_alpaca_api/http/feed_manager.py @@ -0,0 +1,298 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, ClassVar + +from py_alpaca_api.exceptions import APIRequestError, ValidationError + +logger = logging.getLogger(__name__) + + +class FeedType(Enum): + """Available data feed types.""" + + SIP = "sip" + IEX = "iex" + OTC = "otc" + + @classmethod + def from_string(cls, value: str) -> FeedType: + """Create FeedType from string value.""" + try: + return cls(value.lower()) + except ValueError as e: + raise ValidationError( + f"Invalid feed type: {value}. Must be one of {[f.value for f in cls]}" + ) from e + + +class SubscriptionLevel(Enum): + """User subscription levels.""" + + BASIC = "basic" + UNLIMITED = "unlimited" + BUSINESS = "business" + + @classmethod + def from_error(cls, error_message: str) -> SubscriptionLevel | None: + """Detect subscription level from error message.""" + error_lower = error_message.lower() + + if "subscription" in error_lower: + if "unlimited" in error_lower or "business" in error_lower: + return cls.UNLIMITED + return cls.BASIC + return None + + +@dataclass +class FeedConfig: + """Configuration for feed management.""" + + preferred_feed: FeedType = FeedType.SIP + fallback_feeds: list[FeedType] = field(default_factory=lambda: [FeedType.IEX]) + auto_fallback: bool = True + subscription_level: SubscriptionLevel | None = None + endpoint_feeds: dict[str, FeedType] = field(default_factory=dict) + + def get_feed_for_endpoint(self, endpoint: str) -> FeedType: + """Get the configured feed for a specific endpoint.""" + return self.endpoint_feeds.get(endpoint, self.preferred_feed) + + +class FeedManager: + """Manages data feed selection and fallback logic.""" + + # Endpoints that support feed parameter + FEED_SUPPORTED_ENDPOINTS: ClassVar[set[str]] = { + "bars", + "quotes", + "trades", + "snapshots", + "latest/bars", + "latest/quotes", + "latest/trades", + } + + # Feed availability by subscription level + SUBSCRIPTION_FEEDS: ClassVar[dict[SubscriptionLevel, list[FeedType]]] = { + SubscriptionLevel.BASIC: [FeedType.IEX], + SubscriptionLevel.UNLIMITED: [FeedType.SIP, FeedType.IEX, FeedType.OTC], + SubscriptionLevel.BUSINESS: [FeedType.SIP, FeedType.IEX, FeedType.OTC], + } + + def __init__(self, config: FeedConfig | None = None): + """Initialize the feed manager. + + Args: + config: Feed configuration. If None, uses defaults. + """ + self.config = config or FeedConfig() + self._failed_feeds: dict[str, set[FeedType]] = {} + self._detected_subscription_level: SubscriptionLevel | None = None + + def get_feed(self, endpoint: str, symbol: str | None = None) -> str | None: + """Get the appropriate feed for an endpoint. + + Args: + endpoint: The API endpoint being called + symbol: Optional symbol for endpoint-specific logic + + Returns: + Feed parameter value or None if endpoint doesn't support feeds + """ + if not self._supports_feed(endpoint): + return None + + feed = self.config.get_feed_for_endpoint(endpoint) + + # Check if this feed has previously failed + endpoint_key = f"{endpoint}:{symbol}" if symbol else endpoint + if ( + endpoint_key in self._failed_feeds + and feed in self._failed_feeds[endpoint_key] + ): + # Try to use fallback + for fallback in self.config.fallback_feeds: + if fallback not in self._failed_feeds.get(endpoint_key, set()): + logger.info(f"Using fallback feed {fallback.value} for {endpoint}") + return fallback.value + + return feed.value + + def handle_feed_error( + self, + endpoint: str, + feed: str, + error: APIRequestError, + symbol: str | None = None, + ) -> str | None: + """Handle feed-related errors and return alternative feed if available. + + Args: + endpoint: The API endpoint that failed + feed: The feed that caused the error + error: The API error + symbol: Optional symbol for endpoint-specific tracking + + Returns: + Alternative feed to try, or None if no alternatives available + """ + if not self.config.auto_fallback: + return None + + # Try to detect subscription level from error + error_msg = str(error) + detected_level = SubscriptionLevel.from_error(error_msg) + if detected_level and not self._detected_subscription_level: + self._detected_subscription_level = detected_level + logger.info(f"Detected subscription level: {detected_level.value}") + + # Track failed feed + endpoint_key = f"{endpoint}:{symbol}" if symbol else endpoint + if endpoint_key not in self._failed_feeds: + self._failed_feeds[endpoint_key] = set() + + try: + feed_type = FeedType.from_string(feed) + self._failed_feeds[endpoint_key].add(feed_type) + logger.warning(f"Feed {feed} failed for {endpoint_key}: {error_msg}") + except ValidationError: + logger.exception(f"Invalid feed type in error handling: {feed}") + return None + + # Find alternative feed + for fallback in self.config.fallback_feeds: + if fallback not in self._failed_feeds[ + endpoint_key + ] and self._is_feed_available(fallback): + logger.info(f"Falling back to {fallback.value} feed for {endpoint_key}") + return fallback.value + + logger.error(f"No alternative feeds available for {endpoint_key}") + return None + + def detect_subscription_level(self, api_client: Any) -> SubscriptionLevel: + """Detect user's subscription level by testing API access. + + Args: + api_client: API client instance to test with + + Returns: + Detected subscription level + """ + # Try SIP feed first (requires Unlimited/Business) + try: + # Make a test request with SIP feed + test_endpoint = "latest/quotes" + test_params = {"symbols": "AAPL", "feed": FeedType.SIP.value} + + api_client._make_request( + "GET", f"/stocks/{test_endpoint}", params=test_params + ) + + # If successful, user has at least Unlimited + self._detected_subscription_level = SubscriptionLevel.UNLIMITED + logger.info("Detected Unlimited/Business subscription level") + + except APIRequestError as e: + # SIP failed, user likely has Basic subscription + if "subscription" in str(e).lower() or "unauthorized" in str(e).lower(): + self._detected_subscription_level = SubscriptionLevel.BASIC + logger.info("Detected Basic subscription level") + else: + # Unexpected error, default to Basic for safety + self._detected_subscription_level = SubscriptionLevel.BASIC + logger.warning( + f"Could not detect subscription level: {e}. Defaulting to Basic." + ) + + self.config.subscription_level = self._detected_subscription_level + return self._detected_subscription_level + + def validate_feed(self, endpoint: str, feed: str) -> bool: + """Validate if a feed is appropriate for an endpoint. + + Args: + endpoint: The API endpoint + feed: The feed to validate + + Returns: + True if feed is valid for endpoint + """ + if not self._supports_feed(endpoint): + return False + + try: + feed_type = FeedType.from_string(feed) + except ValidationError: + return False + + return self._is_feed_available(feed_type) + + def reset_failures(self, endpoint: str | None = None) -> None: + """Reset tracked feed failures. + + Args: + endpoint: Optional endpoint to reset. If None, resets all. + """ + if endpoint: + keys_to_remove = [ + k for k in self._failed_feeds if k.startswith(f"{endpoint}:") + ] + for key in keys_to_remove: + del self._failed_feeds[key] + if endpoint in self._failed_feeds: + del self._failed_feeds[endpoint] + else: + self._failed_feeds.clear() + + logger.info(f"Reset feed failures for {endpoint or 'all endpoints'}") + + def _supports_feed(self, endpoint: str) -> bool: + """Check if an endpoint supports feed parameter. + + Args: + endpoint: The API endpoint + + Returns: + True if endpoint supports feed parameter + """ + # Check if any supported endpoint pattern matches + return any(supported in endpoint for supported in self.FEED_SUPPORTED_ENDPOINTS) + + def _is_feed_available(self, feed: FeedType) -> bool: + """Check if a feed is available based on subscription level. + + Args: + feed: The feed to check + + Returns: + True if feed is available + """ + if not self._detected_subscription_level and not self.config.subscription_level: + # If we don't know subscription level, assume all feeds available + return True + + level = self._detected_subscription_level or self.config.subscription_level + if level is None: + return True + available_feeds = self.SUBSCRIPTION_FEEDS.get(level, []) + return feed in available_feeds + + def get_available_feeds(self) -> list[FeedType]: + """Get list of available feeds based on subscription level. + + Returns: + List of available feed types + """ + if not self._detected_subscription_level and not self.config.subscription_level: + # If unknown, return all feeds + return list(FeedType) + + level = self._detected_subscription_level or self.config.subscription_level + if level is None: + return list(FeedType) + return self.SUBSCRIPTION_FEEDS.get(level, [FeedType.IEX]) diff --git a/tests/test_http/test_feed_manager.py b/tests/test_http/test_feed_manager.py new file mode 100644 index 0000000..f6e6e5f --- /dev/null +++ b/tests/test_http/test_feed_manager.py @@ -0,0 +1,397 @@ +from __future__ import annotations + +from unittest.mock import Mock + +import pytest + +from py_alpaca_api.exceptions import APIRequestError, ValidationError +from py_alpaca_api.http.feed_manager import ( + FeedConfig, + FeedManager, + FeedType, + SubscriptionLevel, +) + + +class TestFeedType: + """Test FeedType enum functionality.""" + + def test_feed_type_values(self): + """Test feed type enum values.""" + assert FeedType.SIP.value == "sip" + assert FeedType.IEX.value == "iex" + assert FeedType.OTC.value == "otc" + + def test_from_string_valid(self): + """Test creating FeedType from valid string.""" + assert FeedType.from_string("sip") == FeedType.SIP + assert FeedType.from_string("SIP") == FeedType.SIP + assert FeedType.from_string("iex") == FeedType.IEX + assert FeedType.from_string("otc") == FeedType.OTC + + def test_from_string_invalid(self): + """Test creating FeedType from invalid string.""" + with pytest.raises(ValidationError) as exc_info: + FeedType.from_string("invalid") + assert "Invalid feed type: invalid" in str(exc_info.value) + + +class TestSubscriptionLevel: + """Test SubscriptionLevel enum functionality.""" + + def test_subscription_level_values(self): + """Test subscription level enum values.""" + assert SubscriptionLevel.BASIC.value == "basic" + assert SubscriptionLevel.UNLIMITED.value == "unlimited" + assert SubscriptionLevel.BUSINESS.value == "business" + + def test_from_error_basic(self): + """Test detecting basic subscription from error.""" + error = "subscription does not permit SIP feed" + assert SubscriptionLevel.from_error(error) == SubscriptionLevel.BASIC + + def test_from_error_unlimited(self): + """Test detecting unlimited subscription from error.""" + error = "requires unlimited subscription" + assert SubscriptionLevel.from_error(error) == SubscriptionLevel.UNLIMITED + + def test_from_error_business(self): + """Test detecting business subscription from error.""" + error = "business subscription required" + assert SubscriptionLevel.from_error(error) == SubscriptionLevel.UNLIMITED + + def test_from_error_no_match(self): + """Test no subscription level detected from error.""" + error = "generic error message" + assert SubscriptionLevel.from_error(error) is None + + +class TestFeedConfig: + """Test FeedConfig dataclass.""" + + def test_default_config(self): + """Test default feed configuration.""" + config = FeedConfig() + assert config.preferred_feed == FeedType.SIP + assert config.fallback_feeds == [FeedType.IEX] + assert config.auto_fallback is True + assert config.subscription_level is None + assert config.endpoint_feeds == {} + + def test_custom_config(self): + """Test custom feed configuration.""" + config = FeedConfig( + preferred_feed=FeedType.IEX, + fallback_feeds=[FeedType.OTC, FeedType.SIP], + auto_fallback=False, + subscription_level=SubscriptionLevel.UNLIMITED, + ) + assert config.preferred_feed == FeedType.IEX + assert config.fallback_feeds == [FeedType.OTC, FeedType.SIP] + assert config.auto_fallback is False + assert config.subscription_level == SubscriptionLevel.UNLIMITED + + def test_get_feed_for_endpoint(self): + """Test getting feed for specific endpoint.""" + config = FeedConfig( + preferred_feed=FeedType.SIP, + endpoint_feeds={"quotes": FeedType.IEX, "trades": FeedType.OTC}, + ) + assert config.get_feed_for_endpoint("quotes") == FeedType.IEX + assert config.get_feed_for_endpoint("trades") == FeedType.OTC + assert config.get_feed_for_endpoint("bars") == FeedType.SIP + + +class TestFeedManager: + """Test FeedManager class.""" + + def test_init_default(self): + """Test default initialization.""" + manager = FeedManager() + assert manager.config.preferred_feed == FeedType.SIP + assert manager._failed_feeds == {} + assert manager._detected_subscription_level is None + + def test_init_with_config(self): + """Test initialization with custom config.""" + config = FeedConfig(preferred_feed=FeedType.IEX) + manager = FeedManager(config) + assert manager.config.preferred_feed == FeedType.IEX + + def test_get_feed_supported_endpoint(self): + """Test getting feed for supported endpoint.""" + manager = FeedManager() + + # Test supported endpoints + assert manager.get_feed("bars") == "sip" + assert manager.get_feed("latest/quotes") == "sip" + assert manager.get_feed("trades") == "sip" + assert manager.get_feed("snapshots") == "sip" + + def test_get_feed_unsupported_endpoint(self): + """Test getting feed for unsupported endpoint.""" + manager = FeedManager() + + # Unsupported endpoints should return None + assert manager.get_feed("account") is None + assert manager.get_feed("positions") is None + assert manager.get_feed("orders") is None + + def test_get_feed_with_endpoint_config(self): + """Test getting feed with endpoint-specific configuration.""" + config = FeedConfig( + preferred_feed=FeedType.SIP, + endpoint_feeds={"quotes": FeedType.IEX}, + ) + manager = FeedManager(config) + + assert manager.get_feed("quotes") == "iex" + assert manager.get_feed("bars") == "sip" + + def test_get_feed_with_failed_feed(self): + """Test getting feed when preferred feed has failed.""" + config = FeedConfig( + preferred_feed=FeedType.SIP, + fallback_feeds=[FeedType.IEX, FeedType.OTC], + ) + manager = FeedManager(config) + + # Mark SIP as failed for bars endpoint + manager._failed_feeds["bars"] = {FeedType.SIP} + + # Should fallback to IEX + assert manager.get_feed("bars") == "iex" + + # Mark IEX as also failed + manager._failed_feeds["bars"].add(FeedType.IEX) + + # Should fallback to OTC + assert manager.get_feed("bars") == "otc" + + def test_handle_feed_error_no_auto_fallback(self): + """Test handling feed error with auto_fallback disabled.""" + config = FeedConfig(auto_fallback=False) + manager = FeedManager(config) + + error = APIRequestError(403, "Access denied") + result = manager.handle_feed_error("bars", "sip", error) + + assert result is None + assert "bars" not in manager._failed_feeds + + def test_handle_feed_error_with_fallback(self): + """Test handling feed error with fallback.""" + config = FeedConfig( + preferred_feed=FeedType.SIP, + fallback_feeds=[FeedType.IEX, FeedType.OTC], + ) + manager = FeedManager(config) + + error = APIRequestError(403, "subscription does not permit SIP") + result = manager.handle_feed_error("bars", "sip", error) + + # Should return IEX as fallback + assert result == "iex" + assert FeedType.SIP in manager._failed_feeds["bars"] + assert manager._detected_subscription_level == SubscriptionLevel.BASIC + + def test_handle_feed_error_with_symbol(self): + """Test handling feed error with symbol tracking.""" + manager = FeedManager() + + error = APIRequestError(403, "Access denied") + result = manager.handle_feed_error("bars", "sip", error, symbol="AAPL") + + # Should track failure with symbol + assert result == "iex" + assert FeedType.SIP in manager._failed_feeds["bars:AAPL"] + + def test_handle_feed_error_no_alternatives(self): + """Test handling feed error with no alternatives.""" + config = FeedConfig( + preferred_feed=FeedType.SIP, + fallback_feeds=[FeedType.IEX], + ) + manager = FeedManager(config) + + # Mark all feeds as failed + manager._failed_feeds["bars"] = {FeedType.SIP, FeedType.IEX} + + error = APIRequestError(403, "Access denied") + result = manager.handle_feed_error("bars", "sip", error) + + assert result is None + + def test_detect_subscription_level_unlimited(self): + """Test detecting unlimited subscription level.""" + manager = FeedManager() + + # Mock successful API response + mock_client = Mock() + mock_client._make_request.return_value = Mock(status_code=200) + + level = manager.detect_subscription_level(mock_client) + + assert level == SubscriptionLevel.UNLIMITED + assert manager._detected_subscription_level == SubscriptionLevel.UNLIMITED + assert manager.config.subscription_level == SubscriptionLevel.UNLIMITED + + def test_detect_subscription_level_basic(self): + """Test detecting basic subscription level.""" + manager = FeedManager() + + # Mock failed API response + mock_client = Mock() + mock_client._make_request.side_effect = APIRequestError( + 403, "subscription does not permit SIP" + ) + + level = manager.detect_subscription_level(mock_client) + + assert level == SubscriptionLevel.BASIC + assert manager._detected_subscription_level == SubscriptionLevel.BASIC + assert manager.config.subscription_level == SubscriptionLevel.BASIC + + def test_detect_subscription_level_unknown_error(self): + """Test detecting subscription level with unknown error.""" + manager = FeedManager() + + # Mock unexpected error + mock_client = Mock() + mock_client._make_request.side_effect = APIRequestError(500, "Server error") + + level = manager.detect_subscription_level(mock_client) + + # Should default to BASIC for safety + assert level == SubscriptionLevel.BASIC + assert manager._detected_subscription_level == SubscriptionLevel.BASIC + + def test_validate_feed_supported_endpoint(self): + """Test validating feed for supported endpoint.""" + manager = FeedManager() + + assert manager.validate_feed("bars", "sip") is True + assert manager.validate_feed("bars", "iex") is True + assert manager.validate_feed("bars", "invalid") is False + + def test_validate_feed_unsupported_endpoint(self): + """Test validating feed for unsupported endpoint.""" + manager = FeedManager() + + assert manager.validate_feed("account", "sip") is False + assert manager.validate_feed("positions", "iex") is False + + def test_validate_feed_with_subscription_level(self): + """Test validating feed with subscription level.""" + config = FeedConfig(subscription_level=SubscriptionLevel.BASIC) + manager = FeedManager(config) + + # Basic can only use IEX + assert manager.validate_feed("bars", "iex") is True + assert manager.validate_feed("bars", "sip") is False + assert manager.validate_feed("bars", "otc") is False + + def test_reset_failures_all(self): + """Test resetting all feed failures.""" + manager = FeedManager() + + # Add some failures + manager._failed_feeds = { + "bars": {FeedType.SIP}, + "bars:AAPL": {FeedType.IEX}, + "quotes": {FeedType.OTC}, + } + + manager.reset_failures() + + assert manager._failed_feeds == {} + + def test_reset_failures_specific_endpoint(self): + """Test resetting failures for specific endpoint.""" + manager = FeedManager() + + # Add some failures + manager._failed_feeds = { + "bars": {FeedType.SIP}, + "bars:AAPL": {FeedType.IEX}, + "bars:MSFT": {FeedType.SIP}, + "quotes": {FeedType.OTC}, + } + + manager.reset_failures("bars") + + # Only bars-related failures should be reset + assert "bars" not in manager._failed_feeds + assert "bars:AAPL" not in manager._failed_feeds + assert "bars:MSFT" not in manager._failed_feeds + assert "quotes" in manager._failed_feeds + + def test_get_available_feeds_unknown_subscription(self): + """Test getting available feeds with unknown subscription.""" + manager = FeedManager() + + feeds = manager.get_available_feeds() + + # Should return all feeds when subscription unknown + assert set(feeds) == {FeedType.SIP, FeedType.IEX, FeedType.OTC} + + def test_get_available_feeds_basic_subscription(self): + """Test getting available feeds with basic subscription.""" + config = FeedConfig(subscription_level=SubscriptionLevel.BASIC) + manager = FeedManager(config) + + feeds = manager.get_available_feeds() + + assert feeds == [FeedType.IEX] + + def test_get_available_feeds_unlimited_subscription(self): + """Test getting available feeds with unlimited subscription.""" + config = FeedConfig(subscription_level=SubscriptionLevel.UNLIMITED) + manager = FeedManager(config) + + feeds = manager.get_available_feeds() + + assert set(feeds) == {FeedType.SIP, FeedType.IEX, FeedType.OTC} + + def test_get_available_feeds_detected_subscription(self): + """Test getting available feeds with detected subscription.""" + manager = FeedManager() + manager._detected_subscription_level = SubscriptionLevel.BASIC + + feeds = manager.get_available_feeds() + + assert feeds == [FeedType.IEX] + + def test_is_feed_available_unknown_subscription(self): + """Test checking feed availability with unknown subscription.""" + manager = FeedManager() + + # All feeds should be available when subscription unknown + assert manager._is_feed_available(FeedType.SIP) is True + assert manager._is_feed_available(FeedType.IEX) is True + assert manager._is_feed_available(FeedType.OTC) is True + + def test_is_feed_available_basic_subscription(self): + """Test checking feed availability with basic subscription.""" + config = FeedConfig(subscription_level=SubscriptionLevel.BASIC) + manager = FeedManager(config) + + assert manager._is_feed_available(FeedType.IEX) is True + assert manager._is_feed_available(FeedType.SIP) is False + assert manager._is_feed_available(FeedType.OTC) is False + + def test_supports_feed_endpoint(self): + """Test checking if endpoint supports feed parameter.""" + manager = FeedManager() + + # Supported endpoints + assert manager._supports_feed("bars") is True + assert manager._supports_feed("/v2/stocks/bars") is True + assert manager._supports_feed("latest/quotes") is True + assert manager._supports_feed("trades") is True + assert manager._supports_feed("snapshots") is True + + # Unsupported endpoints + assert manager._supports_feed("account") is False + assert manager._supports_feed("positions") is False + assert manager._supports_feed("orders") is False diff --git a/tests/test_http/test_feed_manager_integration.py b/tests/test_http/test_feed_manager_integration.py new file mode 100644 index 0000000..9592e4c --- /dev/null +++ b/tests/test_http/test_feed_manager_integration.py @@ -0,0 +1,283 @@ +from __future__ import annotations + +import os +from unittest.mock import Mock + +import pytest + +from py_alpaca_api import PyAlpacaAPI +from py_alpaca_api.exceptions import APIRequestError +from py_alpaca_api.http.feed_manager import ( + FeedConfig, + FeedManager, + FeedType, + SubscriptionLevel, +) + + +@pytest.fixture +def alpaca(): + """Create PyAlpacaAPI client for testing.""" + api_key = os.getenv("ALPACA_API_KEY") + api_secret = os.getenv("ALPACA_SECRET_KEY") + + if not api_key or not api_secret: + pytest.skip("No API credentials found") + + return PyAlpacaAPI( + api_key=api_key, + api_secret=api_secret, + api_paper=True, + ) + + +@pytest.fixture +def feed_manager(): + """Create a feed manager for testing.""" + return FeedManager() + + +class TestFeedManagerIntegration: + """Integration tests for feed manager with live API.""" + + def test_detect_subscription_level_with_live_api(self, alpaca, feed_manager): + """Test detecting subscription level with live API.""" + # Create a mock client that wraps the real client + mock_client = Mock() + + # Try to get a quote with SIP feed to test subscription + try: + _ = alpaca.stock.latest_quote.get("AAPL", feed="sip") + # If we got here, SIP is available + mock_client._make_request.return_value = Mock(status_code=200) + except APIRequestError as e: + # SIP not available + if "subscription" in str(e).lower() or "feed" in str(e).lower(): + mock_client._make_request.side_effect = e + else: + # Different error, skip test + pytest.skip(f"Unexpected error: {e}") + + # Detect subscription level + level = feed_manager.detect_subscription_level(mock_client) + + assert level in [SubscriptionLevel.BASIC, SubscriptionLevel.UNLIMITED] + assert feed_manager._detected_subscription_level == level + + def test_feed_fallback_with_live_api(self, alpaca): + """Test feed fallback behavior with live API.""" + # Try to get quotes with different feeds + feeds_tested = [] + successful_feed = None + + for feed_type in [FeedType.SIP, FeedType.IEX]: + try: + _ = alpaca.stock.latest_quote.get("AAPL", feed=feed_type.value) + feeds_tested.append((feed_type, True)) + successful_feed = feed_type + break + except APIRequestError: + feeds_tested.append((feed_type, False)) + continue + + # At least one feed should work + assert successful_feed is not None, f"No feeds worked: {feeds_tested}" + + def test_feed_manager_with_bars_endpoint(self, alpaca): + """Test feed manager with bars endpoint.""" + manager = FeedManager() + + # Test with bars endpoint + feed = manager.get_feed("bars") + assert feed in ["sip", "iex", "otc"] + + # Try to fetch bars with the suggested feed + try: + bars = alpaca.stock.history.get_stock_data( + symbol="AAPL", + start="2024-01-01", + end="2024-12-31", + timeframe="1d", + limit=10, + feed=feed, + ) + # If successful, feed is appropriate + assert bars is not None + except APIRequestError as e: + # Feed not available, manager should handle this + alternative = manager.handle_feed_error("bars", feed, e, symbol="AAPL") + if alternative: + # Try with alternative feed + bars = alpaca.stock.history.get_stock_data( + symbol="AAPL", + start="2024-01-01", + end="2024-12-31", + timeframe="1d", + limit=10, + feed=alternative, + ) + assert bars is not None + + def test_feed_manager_with_quotes_endpoint(self, alpaca): + """Test feed manager with quotes endpoint.""" + manager = FeedManager() + + # Test with quotes endpoint + feed = manager.get_feed("latest/quotes") + assert feed in ["sip", "iex", "otc"] + + # Try to fetch quote with the suggested feed + try: + quote = alpaca.stock.latest_quote.get("AAPL", feed=feed) + assert quote is not None + except APIRequestError as e: + # Feed not available, manager should handle this + alternative = manager.handle_feed_error( + "latest/quotes", feed, e, symbol="AAPL" + ) + if alternative: + # Try with alternative feed + quote = alpaca.stock.latest_quote.get("AAPL", feed=alternative) + assert quote is not None + + def test_feed_manager_with_trades_endpoint(self, alpaca): + """Test feed manager with trades endpoint.""" + manager = FeedManager() + + # Test with trades endpoint + feed = manager.get_feed("trades") + assert feed in ["sip", "iex", "otc"] + + # Try to fetch trades with the suggested feed + try: + trades = alpaca.stock.trades.get_latest_trade("AAPL", feed=feed) + assert trades is not None + except APIRequestError as e: + # Feed not available, manager should handle this + alternative = manager.handle_feed_error("trades", feed, e, symbol="AAPL") + if alternative: + # Try with alternative feed + trades = alpaca.stock.trades.get_latest_trade("AAPL", feed=alternative) + assert trades is not None + + def test_feed_validation_with_live_data(self, alpaca): + """Test feed validation based on actual API access.""" + manager = FeedManager() + + # Test validation for bars endpoint + assert manager.validate_feed("bars", "iex") is True + assert manager.validate_feed("bars", "invalid_feed") is False + + # Test validation for non-feed endpoint + assert manager.validate_feed("account", "iex") is False + + def test_feed_manager_caching_behavior(self, alpaca): + """Test that feed manager caches failed feeds appropriately.""" + manager = FeedManager( + FeedConfig( + preferred_feed=FeedType.SIP, + fallback_feeds=[FeedType.IEX], + ) + ) + + # First request + feed1 = manager.get_feed("bars", symbol="AAPL") + + # Simulate a failure if using SIP + if feed1 == "sip": + try: + _ = alpaca.stock.history.get_stock_data( + symbol="AAPL", + start="2024-01-01", + end="2024-12-31", + timeframe="1d", + limit=1, + feed=feed1, + ) + except APIRequestError as e: + # Handle the error + alternative = manager.handle_feed_error("bars", feed1, e, symbol="AAPL") + + # Second request should return alternative directly + feed2 = manager.get_feed("bars", symbol="AAPL") + assert feed2 in {alternative, "iex"} + + def test_feed_manager_reset_failures(self): + """Test resetting feed failures.""" + manager = FeedManager() + + # Add some failures + error = APIRequestError(403, "Access denied") + manager.handle_feed_error("bars", "sip", error, symbol="AAPL") + manager.handle_feed_error("quotes", "sip", error) + + assert len(manager._failed_feeds) > 0 + + # Reset all failures + manager.reset_failures() + assert len(manager._failed_feeds) == 0 + + def test_multiple_symbols_with_feed_manager(self, alpaca): + """Test feed manager with multiple symbols.""" + manager = FeedManager() + + symbols = ["AAPL", "GOOGL", "MSFT"] + successful_fetches = [] + + for symbol in symbols: + feed = manager.get_feed("latest/quotes", symbol=symbol) + + try: + _ = alpaca.stock.latest_quote.get(symbol, feed=feed) + successful_fetches.append((symbol, feed, True)) + except APIRequestError as e: + # Try fallback + alternative = manager.handle_feed_error( + "latest/quotes", feed, e, symbol=symbol + ) + if alternative: + try: + _ = alpaca.stock.latest_quote.get(symbol, feed=alternative) + successful_fetches.append((symbol, alternative, True)) + except APIRequestError: + successful_fetches.append((symbol, alternative, False)) + else: + successful_fetches.append((symbol, feed, False)) + + # At least some symbols should succeed + successful_count = sum(1 for _, _, success in successful_fetches if success) + assert ( + successful_count > 0 + ), f"Failed to fetch any symbols: {successful_fetches}" + + def test_feed_config_endpoint_specific(self, alpaca): + """Test endpoint-specific feed configuration.""" + config = FeedConfig( + preferred_feed=FeedType.SIP, + endpoint_feeds={ + "latest/quotes": FeedType.IEX, + "bars": FeedType.SIP, + }, + ) + manager = FeedManager(config) + + # Check that endpoint-specific config is used + assert manager.get_feed("latest/quotes") == "iex" + assert manager.get_feed("bars") == "sip" + assert manager.get_feed("trades") == "sip" # Uses default + + def test_subscription_level_affects_available_feeds(self): + """Test that subscription level affects available feeds.""" + # Test with BASIC subscription + config_basic = FeedConfig(subscription_level=SubscriptionLevel.BASIC) + manager_basic = FeedManager(config_basic) + + available_basic = manager_basic.get_available_feeds() + assert available_basic == [FeedType.IEX] + + # Test with UNLIMITED subscription + config_unlimited = FeedConfig(subscription_level=SubscriptionLevel.UNLIMITED) + manager_unlimited = FeedManager(config_unlimited) + + available_unlimited = manager_unlimited.get_available_feeds() + assert set(available_unlimited) == {FeedType.SIP, FeedType.IEX, FeedType.OTC} diff --git a/tests/test_stock/test_trades_live.py b/tests/test_stock/test_trades_live.py index e761edd..bc8c206 100644 --- a/tests/test_stock/test_trades_live.py +++ b/tests/test_stock/test_trades_live.py @@ -255,8 +255,14 @@ def test_get_all_trades(self, alpaca): print(f"\nRetrieved {len(all_trades)} total trades across all pages") # Check trades are in chronological order - timestamps = [trade.timestamp for trade in all_trades] - assert timestamps == sorted(timestamps) + # Parse timestamps to handle different precision levels + parsed_timestamps = [ + datetime.fromisoformat(trade.timestamp.replace("Z", "+00:00")) + for trade in all_trades + ] + assert parsed_timestamps == sorted( + parsed_timestamps + ), "Trades are not in chronological order" print(" Trades are in chronological order ✓") except APIRequestError as e: