diff --git a/actions/AdSchedule.py b/actions/AdSchedule.py index b2c61fa..ce92c68 100644 --- a/actions/AdSchedule.py +++ b/actions/AdSchedule.py @@ -2,7 +2,9 @@ from datetime import datetime, timedelta from threading import Thread from time import sleep +from typing import Any, List +from gi.repository import GLib from GtkHelper.GenerativeUI.SwitchRow import SwitchRow from .TwitchCore import TwitchCore from src.backend.PluginManager.EventAssigner import EventAssigner @@ -11,6 +13,12 @@ from loguru import logger as log +from ..constants import ( + AD_SCHEDULE_FETCH_INTERVAL_SECONDS, + AD_DISPLAY_UPDATE_INTERVAL_SECONDS, + ERROR_DISPLAY_DURATION_SECONDS, +) + class Icons(StrEnum): DELAY = "delay" @@ -23,7 +31,7 @@ class Colors(StrEnum): class AdSchedule(TwitchCore): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.icon_keys = [Icons.DELAY] self.current_icon = self.get_icon(Icons.DELAY) @@ -34,14 +42,13 @@ def __init__(self, *args, **kwargs): self._next_ad: datetime = datetime.now() self._snoozes: int = -1 - def on_ready(self): + def on_ready(self) -> None: super().on_ready() Thread( - target=self._get_ad_schedule, daemon=True, name="get_ad_schedule").start() - Thread( - target=self._update_ad_timer, daemon=True, name="update_ad_timer").start() + target=self._update_ad_display, daemon=True, name="update_ad_display" + ).start() - def create_event_assigners(self): + def create_event_assigners(self) -> None: self.event_manager.add_event_assigner( EventAssigner( id="snooze-ad", @@ -51,75 +58,93 @@ def create_event_assigners(self): ) ) - def create_generative_ui(self): + def create_generative_ui(self) -> None: self._skip_ad_switch = SwitchRow( action_core=self, var_name="ad.snooze", default_value=True, title="ad-snooze", subtitle="Snoozes ad for 5 minutes when pushed", - complex_var_name=True + complex_var_name=True, ) - def get_config_rows(self): + def get_config_rows(self) -> List[Any]: return [self._skip_ad_switch.widget] - def _update_background_color(self, color: str): - self.current_color = self.get_color(color) - self.display_color() + def _update_background_color(self, color: str) -> None: + def _update(): + self.current_color = self.get_color(color) + self.display_color() + + GLib.idle_add(_update) + + def _update_ad_display(self) -> None: + """Consolidated update loop that fetches ad schedule and updates display.""" + last_fetch_time = datetime.now() - timedelta( + seconds=AD_SCHEDULE_FETCH_INTERVAL_SECONDS + ) # Fetch immediately on start - def _update_ad_timer(self): while self.get_is_present(): self.display_color() now = datetime.now() - self.set_bottom_label( - str(self._snoozes) if (self._snoozes >= 0 and self._skip_ad_switch.get_active()) else "") + + # Fetch ad schedule every 30 seconds + if ( + now - last_fetch_time + ).total_seconds() >= AD_SCHEDULE_FETCH_INTERVAL_SECONDS: + try: + schedule, snoozes = self.backend.get_next_ad() + self._next_ad = schedule + self._snoozes = snoozes + last_fetch_time = now + except Exception as ex: + log.error(f"Failed to get ad schedule from Twitch API: {ex}") + self.show_error(ERROR_DISPLAY_DURATION_SECONDS) + + # Update display every second + snooze_label = ( + str(self._snoozes) + if (self._snoozes >= 0 and self._skip_ad_switch.get_active()) + else "" + ) + GLib.idle_add(lambda: self.set_bottom_label(snooze_label)) + try: if self._next_ad < now: self._update_background_color(Colors.DEFAULT) - self.set_center_label("") + GLib.idle_add(lambda: self.set_center_label("")) + sleep(AD_DISPLAY_UPDATE_INTERVAL_SECONDS) continue diff = (self._next_ad - now).total_seconds() - self.set_center_label(self._convert_seconds_to_hh_mm_ss(diff)) + time_label = self._convert_seconds_to_hh_mm_ss(diff) + GLib.idle_add(lambda: self.set_center_label(time_label)) if diff <= 60: self._update_background_color(Colors.ALERT) - continue - if diff <= 300: + elif diff <= 300: self._update_background_color(Colors.WARNING) - continue - self._update_background_color(Colors.DEFAULT) + else: + self._update_background_color(Colors.DEFAULT) except TypeError: # There is a known issue where the default timestamp returned from # the twitch API is an invalid datetime object and causes an error. # Ignoring it here pass except Exception as ex: - log.error(ex) - sleep(1) + log.error(f"Failed to update ad timer display: {ex}") - def _get_ad_schedule(self): - while self.get_is_present(): - try: - schedule, snoozes = self.backend.get_next_ad() - self._next_ad = schedule - self._snoozes = snoozes - self._update_ad_timer() - except Exception as ex: - log.error(ex) - self.show_error(3) - sleep(30) + sleep(AD_DISPLAY_UPDATE_INTERVAL_SECONDS) - def _convert_seconds_to_hh_mm_ss(self, seconds) -> str: + def _convert_seconds_to_hh_mm_ss(self, seconds: float) -> str: hours = seconds // 3600 minutes = (seconds % 3600) // 60 remaining_seconds = seconds % 60 return f"{int(hours):02}:{int(minutes):02}:{int(remaining_seconds):02}" - def _on_snooze_ad(self, _): + def _on_snooze_ad(self, _: Any) -> None: if not self._skip_ad_switch.get_active(): return try: self.backend.snooze_ad() except Exception as ex: - log.error(ex) - self.show_error(3) + log.error(f"Failed to snooze next ad: {ex}") + self.show_error(ERROR_DISPLAY_DURATION_SECONDS) diff --git a/actions/ChatMode.py b/actions/ChatMode.py index a91c2c5..8fe31fd 100644 --- a/actions/ChatMode.py +++ b/actions/ChatMode.py @@ -1,7 +1,9 @@ from enum import StrEnum, Enum from threading import Thread from time import sleep +from typing import Any, List, Optional +from gi.repository import GLib from .TwitchCore import TwitchCore from src.backend.PluginManager.EventAssigner import EventAssigner from src.backend.PluginManager.InputBases import Input @@ -10,6 +12,11 @@ from loguru import logger as log +from ..constants import ( + CHAT_MODE_UPDATE_INTERVAL_SECONDS, + ERROR_DISPLAY_DURATION_SECONDS, +) + class Icons(StrEnum): FOLLOWER = "follower_mode" @@ -26,14 +33,13 @@ class ChatModeOptions(Enum): class ChatMode(TwitchCore): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) - self.icon_keys = [Icons.FOLLOWER, - Icons.SUBSCRIBER, Icons.EMOTE, Icons.SLOW] + self.icon_keys = [Icons.FOLLOWER, Icons.SUBSCRIBER, Icons.EMOTE, Icons.SLOW] self.current_icon = self.get_icon(Icons.FOLLOWER) self.icon_name = Icons.FOLLOWER - def create_event_assigners(self): + def create_event_assigners(self) -> None: self.event_manager.add_event_assigner( EventAssigner( id="chat-toggle", @@ -43,7 +49,7 @@ def create_event_assigners(self): ) ) - def create_generative_ui(self): + def create_generative_ui(self) -> None: self._chat_select_row = ComboRow( action_core=self, var_name="chat.mode", @@ -56,42 +62,48 @@ def create_generative_ui(self): ], title="chat-toggle-dropdown", complex_var_name=True, - on_change=self._change_chat_mode + on_change=self._change_chat_mode, ) - def on_ready(self): + def on_ready(self) -> None: Thread( - target=self._update_chat_mode, daemon=True, name="update_chat_mode").start() + target=self._update_chat_mode, daemon=True, name="update_chat_mode" + ).start() - def get_config_rows(self): + def get_config_rows(self) -> List[Any]: return [self._chat_select_row.widget] - def _change_chat_mode(self, _, new, __): + def _change_chat_mode(self, _: Any, new: str, __: Any) -> None: self.icon_name = Icons(new) self.current_icon = self.get_icon(self.icon_name) self.display_icon() - def _update_icon(self, mode: str, enabled: bool): + def _update_icon(self, mode: str, enabled: bool) -> None: # TODO: Custom icons for enabled/disabled - self.set_center_label("Enabled" if enabled else "Disabled") + label = "Enabled" if enabled else "Disabled" + GLib.idle_add(lambda: self.set_center_label(label)) - def _update_chat_mode(self): + def _update_chat_mode(self) -> None: while self.get_is_present(): + mode: Optional[str] = None try: chat_settings = self.backend.get_chat_settings() mode = self._chat_select_row.get_selected_item().get_value() - enabled = chat_settings.get(mode) - self._update_icon(mode, enabled) + if mode: + enabled = chat_settings.get(mode) + self._update_icon(mode, enabled) except Exception as ex: - log.error(ex) - self.show_error(3) - sleep(5) + log.error( + f"Failed to update chat mode status{f' for {mode}' if mode else ''}: {ex}" + ) + self.show_error(ERROR_DISPLAY_DURATION_SECONDS) + sleep(CHAT_MODE_UPDATE_INTERVAL_SECONDS) - def _on_toggle_chat(self, _): + def _on_toggle_chat(self, _: Any) -> None: item = self._chat_select_row.get_selected_item().get_value() try: resp = self.backend.toggle_chat_mode(item) self._update_icon(item, resp) except Exception as ex: - log.error(ex) - self.show_error(3) + log.error(f"Failed to toggle chat mode '{item}': {ex}") + self.show_error(ERROR_DISPLAY_DURATION_SECONDS) diff --git a/actions/Clip.py b/actions/Clip.py index 8b23c80..09cd87b 100644 --- a/actions/Clip.py +++ b/actions/Clip.py @@ -1,4 +1,5 @@ from enum import StrEnum +from typing import Any from loguru import logger as log @@ -6,32 +7,34 @@ from src.backend.PluginManager.EventAssigner import EventAssigner from src.backend.PluginManager.InputBases import Input +from ..constants import ERROR_DISPLAY_DURATION_SECONDS + class Icons(StrEnum): CLIP = "camera" class Clip(TwitchCore): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.icon_keys = [Icons.CLIP] self.current_icon = self.get_icon(Icons.CLIP) self.icon_name = Icons.CLIP self.has_configuration = False - def create_event_assigners(self): + def create_event_assigners(self) -> None: self.event_manager.add_event_assigner( EventAssigner( id="clip", ui_label="Clip", default_event=Input.Key.Events.DOWN, - callback=self._on_clip + callback=self._on_clip, ) ) - def _on_clip(self, _): + def _on_clip(self, _: Any) -> None: try: self.backend.create_clip() except Exception as ex: - log.error(ex) - self.show_error(3) + log.error(f"Failed to create clip: {ex}") + self.show_error(ERROR_DISPLAY_DURATION_SECONDS) diff --git a/actions/Marker.py b/actions/Marker.py index 05a178b..e05a3a6 100644 --- a/actions/Marker.py +++ b/actions/Marker.py @@ -1,4 +1,5 @@ from enum import StrEnum +from typing import Any from loguru import logger as log @@ -6,32 +7,34 @@ from src.backend.PluginManager.EventAssigner import EventAssigner from src.backend.PluginManager.InputBases import Input +from ..constants import ERROR_DISPLAY_DURATION_SECONDS + class Icons(StrEnum): MARKER = "bookmark" class Marker(TwitchCore): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.icon_keys = [Icons.MARKER] self.current_icon = self.get_icon(Icons.MARKER) self.icon_name = Icons.MARKER self.has_configuration = False - def create_event_assigners(self): + def create_event_assigners(self) -> None: self.event_manager.add_event_assigner( EventAssigner( id="marker", ui_label="Marker", default_event=Input.Key.Events.DOWN, - callback=self._on_marker + callback=self._on_marker, ) ) - def _on_marker(self, _): + def _on_marker(self, _: Any) -> None: try: self.backend.create_marker() except Exception as ex: - log.error(ex) - self.show_error(3) + log.error(f"Failed to create stream marker: {ex}") + self.show_error(ERROR_DISPLAY_DURATION_SECONDS) diff --git a/actions/PlayAd.py b/actions/PlayAd.py index 5aff501..df373a9 100644 --- a/actions/PlayAd.py +++ b/actions/PlayAd.py @@ -1,4 +1,5 @@ from enum import StrEnum, Enum +from typing import Any, List, Optional from loguru import logger as log @@ -8,48 +9,55 @@ from GtkHelper.GenerativeUI.ComboRow import ComboRow from GtkHelper.ComboRow import SimpleComboRowItem, BaseComboRowItem +from ..constants import ERROR_DISPLAY_DURATION_SECONDS + class Icons(StrEnum): AD = "money" class PlayAd(TwitchCore): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.icon_keys = [Icons.AD] self.current_icon = self.get_icon(Icons.AD) self.icon_name = Icons.AD self.has_configuration = True - def create_generative_ui(self): - options = [SimpleComboRowItem(str(x), f"{x} seconds") - for x in [30, 60, 90, 120]] + def create_generative_ui(self) -> None: + options = [ + SimpleComboRowItem(str(x), f"{x} seconds") for x in [30, 60, 90, 120] + ] self._time_row = ComboRow( action_core=self, var_name="ad.duration", default_value=options[0], items=options, title="ad-options-dropdown", - complex_var_name=True + complex_var_name=True, ) - def get_config_rows(self): + def get_config_rows(self) -> List[Any]: return [self._time_row.widget] - def create_event_assigners(self): + def create_event_assigners(self) -> None: self.event_manager.add_event_assigner( EventAssigner( id="play-ad", ui_label="Play Ad", default_event=Input.Key.Events.DOWN, - callback=self._on_play_ad + callback=self._on_play_ad, ) ) - def _on_play_ad(self, _): + def _on_play_ad(self, _: Any) -> None: + time: Optional[str] = None try: time = self._time_row.get_selected_item().get_value() - self.backend.play_ad(int(time)) + if time: + self.backend.play_ad(int(time)) except Exception as ex: - log.error(ex) - self.show_error(3) + log.error( + f"Failed to play ad{f' (duration: {time}s)' if time else ''}: {ex}" + ) + self.show_error(ERROR_DISPLAY_DURATION_SECONDS) diff --git a/actions/SendMessage.py b/actions/SendMessage.py index b099368..5787baa 100644 --- a/actions/SendMessage.py +++ b/actions/SendMessage.py @@ -1,4 +1,5 @@ from enum import StrEnum +from typing import Any, List from .TwitchCore import TwitchCore from src.backend.PluginManager.EventAssigner import EventAssigner @@ -8,30 +9,32 @@ from loguru import logger as log +from ..constants import ERROR_DISPLAY_DURATION_SECONDS + class Icons(StrEnum): CHAT = "chat" class SendMessage(TwitchCore): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.icon_keys = [Icons.CHAT] self.current_icon = self.get_icon(Icons.CHAT) self.icon_name = Icons.CHAT self.has_configuration = True - def create_event_assigners(self): + def create_event_assigners(self) -> None: self.event_manager.add_event_assigner( EventAssigner( id="chat", ui_label="Message", default_event=Input.Key.Events.DOWN, - callback=self._on_chat + callback=self._on_chat, ) ) - def create_generative_ui(self): + def create_generative_ui(self) -> None: self.message_row = EntryRow( action_core=self, var_name="chat.message_text", @@ -49,14 +52,14 @@ def create_generative_ui(self): complex_var_name=True, ) - def get_config_rows(self): + def get_config_rows(self) -> List[Any]: return [self.message_row.widget, self.channel_row.widget] - def _on_chat(self, _): + def _on_chat(self, _: Any) -> None: message = self.message_row.get_value() channel = self.channel_row.get_value() try: self.backend.send_message(message, channel) except Exception as ex: - log.error(ex) - self.show_error(3) + log.error(f"Failed to send chat message to channel '{channel}': {ex}") + self.show_error(ERROR_DISPLAY_DURATION_SECONDS) diff --git a/actions/ShowViewers.py b/actions/ShowViewers.py index 3893bec..9d15a56 100644 --- a/actions/ShowViewers.py +++ b/actions/ShowViewers.py @@ -1,33 +1,36 @@ from enum import StrEnum from threading import Thread from time import sleep +from typing import Optional, Union +from gi.repository import GLib from .TwitchCore import TwitchCore from src.backend.PluginManager.EventAssigner import EventAssigner from src.backend.PluginManager.InputBases import Input from loguru import logger as log +from ..constants import VIEWER_UPDATE_INTERVAL_SECONDS + class Icons(StrEnum): VIEWERS = "view" class ShowViewers(TwitchCore): - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.icon_keys = [Icons.VIEWERS] self.current_icon = self.get_icon(Icons.VIEWERS) self.icon_name = Icons.VIEWERS - def on_ready(self): - Thread( - target=self._update_viewers, daemon=True, name="update_viewers").start() + def on_ready(self) -> None: + Thread(target=self._update_viewers, daemon=True, name="update_viewers").start() - def _update_viewers(self): + def _update_viewers(self) -> None: while self.get_is_present(): - count = self.backend.get_viewers() + count: Union[Optional[int], str] = self.backend.get_viewers() if not count: count = "-" - self.set_center_label(str(count)) - sleep(10) + GLib.idle_add(lambda c=count: self.set_center_label(str(c))) + sleep(VIEWER_UPDATE_INTERVAL_SECONDS) diff --git a/actions/TwitchCore.py b/actions/TwitchCore.py index 3f0bcd3..ae3bf4c 100644 --- a/actions/TwitchCore.py +++ b/actions/TwitchCore.py @@ -2,6 +2,7 @@ from src.backend.PluginManager.ActionCore import ActionCore from src.backend.DeckManagement.InputIdentifier import InputEvent, Input from src.backend.PluginManager.PluginSettings.Asset import Color, Icon +from typing import Optional, Any from gi.repository import Gtk, Adw import gi @@ -11,17 +12,17 @@ class TwitchCore(ActionCore): - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) # Setup AssetManager values - self.icon_keys = [] - self.color_keys = [] - self.current_icon: Icon = None - self.current_color: Color = None - self.icon_name = "" - self.color_name = "" - self.backend: 'Backend' = self.plugin_base.backend + self.icon_keys: list[str] = [] + self.color_keys: list[str] = [] + self.current_icon: Optional[Icon] = None + self.current_color: Optional[Color] = None + self.icon_name: str = "" + self.color_name: str = "" + self.backend: Any = self.plugin_base.backend self.plugin_base.asset_manager.icons.add_listener(self._icon_changed) self.plugin_base.asset_manager.colors.add_listener(self._color_changed) @@ -30,11 +31,53 @@ def __init__(self, *args, **kwargs): self.create_generative_ui() self.create_event_assigners() - def on_ready(self): + def on_ready(self) -> None: super().on_ready() self.display_icon() self.display_color() + def create_generative_ui(self) -> None: + pass + + def create_event_assigners(self) -> None: + pass + + def display_icon(self) -> None: + if not self.current_icon: + return + _, rendered = self.current_icon.get_values() + if rendered: + self.set_media(image=rendered) + + async def _icon_changed(self, event: str, key: str, asset: Any) -> None: + if not key in self.icon_keys: + return + if key != self.icon_name: + return + self.current_icon = asset + self.icon_name = key + self.display_icon() + + def display_color(self) -> None: + if not self.current_color: + return + color = self.current_color.get_values() + try: + self.set_background_color(color) + except: + # Sometimes we try to call this too early, and it leads to + # console errors, but no real errors. Ignoring this for now + pass + + async def _color_changed(self, event: str, key: str, asset: Any) -> None: + if not key in self.color_keys: + return + if key != self.color_name: + return + self.current_color = asset + self.color_name = key + self.display_color() + def create_generative_ui(self): pass diff --git a/constants.py b/constants.py new file mode 100644 index 0000000..77000cc --- /dev/null +++ b/constants.py @@ -0,0 +1,22 @@ +""" +Constants used throughout the Twitch plugin. +""" + +# Update intervals (in seconds) +VIEWER_UPDATE_INTERVAL_SECONDS = 10 +AD_SCHEDULE_FETCH_INTERVAL_SECONDS = 30 +AD_DISPLAY_UPDATE_INTERVAL_SECONDS = 1 +CHAT_MODE_UPDATE_INTERVAL_SECONDS = 5 + +# Error display +ERROR_DISPLAY_DURATION_SECONDS = 3 + +# OAuth/Authentication +OAUTH_REDIRECT_URI = "http://localhost:3000/auth" +OAUTH_PORT = 3000 + +# Rate Limiting +# Twitch API standard rate limit: 800 requests per minute +# Using conservative limit to avoid hitting the cap +RATE_LIMIT_CALLS = 100 +RATE_LIMIT_PERIOD = 60 # seconds diff --git a/main.py b/main.py index d25aaea..169c6f1 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,7 @@ import os import globals as gl import json +from typing import Optional, Callable, Any from loguru import logger from gi.repository import Gtk @@ -24,11 +25,23 @@ class PluginTemplate(PluginBase): + """Twitch StreamController Plugin. + + Provides integration with Twitch for StreamController, enabling actions like: + - Creating clips and stream markers + - Sending chat messages + - Displaying viewer counts + - Managing chat modes (follower-only, emote-only, slow mode, etc.) + - Playing ads and managing ad schedules + + All Twitch API calls are rate-limited to prevent exceeding API limits. + """ + def get_selector_icon(self) -> Gtk.Widget: _, rendered = self.asset_manager.icons.get_asset_values("main") return Gtk.Image.new_from_pixbuf(image2pixbuf(rendered)) - def _add_icons(self): + def _add_icons(self) -> None: self.add_icon("main", self.get_asset_path("glitch_flat_purple.png")) self.add_icon("chat", self.get_asset_path("chat.png")) self.add_icon("camera", self.get_asset_path("camera.png")) @@ -41,12 +54,12 @@ def _add_icons(self): self.add_icon("money", self.get_asset_path("money.png")) self.add_icon("delay", self.get_asset_path("delay.png")) - def _add_colors(self): + def _add_colors(self) -> None: self.add_color("default", [0, 0, 0, 0]) self.add_color("warning", [255, 244, 79, 255]) self.add_color("alert", [224, 102, 102, 255]) - def _register_actions(self): + def _register_actions(self) -> None: self.message_action_holder = ActionHolder( plugin_base=self, action_base=SendMessage, @@ -56,7 +69,7 @@ def _register_actions(self): Input.Key: ActionInputSupport.SUPPORTED, Input.Dial: ActionInputSupport.UNTESTED, Input.Touchscreen: ActionInputSupport.UNTESTED, - } + }, ) self.add_action_holder(self.message_action_holder) @@ -69,7 +82,7 @@ def _register_actions(self): Input.Key: ActionInputSupport.SUPPORTED, Input.Dial: ActionInputSupport.UNTESTED, Input.Touchscreen: ActionInputSupport.UNTESTED, - } + }, ) self.add_action_holder(self.clip_action_holder) @@ -82,7 +95,7 @@ def _register_actions(self): Input.Key: ActionInputSupport.SUPPORTED, Input.Dial: ActionInputSupport.UNTESTED, Input.Touchscreen: ActionInputSupport.UNTESTED, - } + }, ) self.add_action_holder(self.viewers_action_holder) @@ -95,7 +108,7 @@ def _register_actions(self): Input.Key: ActionInputSupport.SUPPORTED, Input.Dial: ActionInputSupport.UNTESTED, Input.Touchscreen: ActionInputSupport.UNTESTED, - } + }, ) self.add_action_holder(self.marker_actions_holder) @@ -108,7 +121,7 @@ def _register_actions(self): Input.Key: ActionInputSupport.SUPPORTED, Input.Dial: ActionInputSupport.UNTESTED, Input.Touchscreen: ActionInputSupport.UNTESTED, - } + }, ) self.add_action_holder(self.chatmode_actions_holder) @@ -121,7 +134,7 @@ def _register_actions(self): Input.Key: ActionInputSupport.SUPPORTED, Input.Dial: ActionInputSupport.UNTESTED, Input.Touchscreen: ActionInputSupport.UNTESTED, - } + }, ) self.add_action_holder(self.playad_action_holder) @@ -134,72 +147,87 @@ def _register_actions(self): Input.Key: ActionInputSupport.SUPPORTED, Input.Dial: ActionInputSupport.UNTESTED, Input.Touchscreen: ActionInputSupport.UNTESTED, - } + }, ) self.add_action_holder(self.ad_schedule_action_holder) - def _setup_backend(self): + def _setup_backend(self) -> bool: backend_path = os.path.join(self.PATH, "twitch_backend.py") - self.launch_backend(backend_path=backend_path, open_in_terminal=False, - venv_path=os.path.join(self.PATH, ".venv")) - self.wait_for_backend(tries=5) + self.launch_backend( + backend_path=backend_path, + open_in_terminal=False, + venv_path=os.path.join(self.PATH, ".venv"), + ) + backend_ready = self.wait_for_backend(tries=5) + if not backend_ready: + logger.error("Failed to initialize Twitch backend after 5 attempts") settings = self.get_settings() - client_id = settings.get('client_id', '') - client_secret = settings.get('client_secret', '') - auth_code = settings.get('auth_code', '') + client_id = settings.get("client_id", "") + client_secret = settings.get("client_secret", "") + auth_code = settings.get("auth_code", "") settings_path = os.path.join( - gl.DATA_PATH, 'settings', 'plugins', self.get_plugin_id_from_folder_name()) + gl.DATA_PATH, "settings", "plugins", self.get_plugin_id_from_folder_name() + ) os.makedirs(settings_path, exist_ok=True) self.backend.set_token_path(os.path.join(settings_path, "keys.json")) if client_id and client_secret and auth_code: - self.backend.auth_with_code( - client_id, client_secret, auth_code) + self.backend.auth_with_code(client_id, client_secret, auth_code) + return True - def __init__(self): + def __init__(self) -> None: super().__init__(use_legacy_locale=False) self.has_plugin_settings = True self.lm = self.locale_manager self.lm.set_to_os_default() - self._settings_manager = PluginSettings(self) - self.auth_callback_fn: callable = None + self._settings_manager: PluginSettings = PluginSettings(self) + self.auth_callback_fn: Optional[Callable[[bool, str], None]] = None + self.backend_initialized: bool = False self._add_icons() self._add_colors() - self._setup_backend() + self.backend_initialized = self._setup_backend() + if not self.backend_initialized: + logger.warning( + "Twitch plugin loaded but backend failed to initialize. Please check settings." + ) self._register_actions() try: - with open(os.path.join(self.PATH, "manifest.json"), "r", encoding="UTF-8") as f: + with open( + os.path.join(self.PATH, "manifest.json"), "r", encoding="UTF-8" + ) as f: data = json.load(f) except Exception as ex: logger.error(ex) data = {} app_manifest = { "plugin_version": data.get("version", "0.0.0"), - "app_version": data.get("app-version", "0.0.0") + "app_version": data.get("app-version", "0.0.0"), } self.register( plugin_name="Twitch Integration", github_repo="https://github.com/imdevinc/StreamControllerTwitchPlugin", plugin_version=app_manifest.get("plugin_version"), - app_version=app_manifest.get("app_version") + app_version=app_manifest.get("app_version"), ) self.add_css_stylesheet(os.path.join(self.PATH, "style.css")) - def save_auth_settings(self, client_id: str, client_secret: str, auth_code: str) -> None: + def save_auth_settings( + self, client_id: str, client_secret: str, auth_code: str + ) -> None: settings = self.get_settings() - settings['client_id'] = client_id - settings['client_secret'] = client_secret - settings['auth_code'] = auth_code + settings["client_id"] = client_id + settings["client_secret"] = client_secret + settings["auth_code"] = auth_code self.set_settings(settings) def on_auth_callback(self, success: bool, message: str = "") -> None: if self.auth_callback_fn: self.auth_callback_fn(success, message) - def get_settings_area(self): + def get_settings_area(self) -> Any: return self._settings_manager.get_settings_area() diff --git a/manifest.json b/manifest.json index e332618..fe10a32 100644 --- a/manifest.json +++ b/manifest.json @@ -1,5 +1,5 @@ { - "version": "1.6.0", + "version": "1.7.0", "thumbnail": "store/thumbnail.png", "id": "com_imdevinc_StreamControllerTwitchPlugin", "name": "Twitch Integration", diff --git a/settings.py b/settings.py index e082a09..5dbc6f7 100644 --- a/settings.py +++ b/settings.py @@ -1,5 +1,6 @@ from gi.repository import Gtk, Adw import gi +from typing import Any from loguru import logger @@ -18,40 +19,54 @@ class PluginSettings: _client_secret: Adw.PasswordEntryRow _auth_button: Gtk.Button - def __init__(self, plugin_base: PluginBase): - self._plugin_base = plugin_base + def __init__(self, plugin_base: PluginBase) -> None: + self._plugin_base: PluginBase = plugin_base def get_settings_area(self) -> Adw.PreferencesGroup: - if not self._plugin_base.backend.is_authed(): - self._status_label = Gtk.Label(label=self._plugin_base.lm.get( - "actions.base.credentials.failed"), css_classes=["twitch-controller-red"]) + if not self._plugin_base.backend or not self._plugin_base.backend_initialized: + self._status_label = Gtk.Label( + label=self._plugin_base.lm.get("actions.base.credentials.failed"), + css_classes=["twitch-controller-red"], + ) + elif not self._plugin_base.backend.is_authed(): + self._status_label = Gtk.Label( + label=self._plugin_base.lm.get("actions.base.credentials.failed"), + css_classes=["twitch-controller-red"], + ) else: - self._status_label = Gtk.Label(label=self._plugin_base.lm.get( - "actions.base.credentials.authenticated"), css_classes=["twitch-controller-green"]) + self._status_label = Gtk.Label( + label=self._plugin_base.lm.get( + "actions.base.credentials.authenticated" + ), + css_classes=["twitch-controller-green"], + ) self._client_id = Adw.EntryRow( - title=self._plugin_base.lm.get("actions.base.twitch_client_id")) + title=self._plugin_base.lm.get("actions.base.twitch_client_id") + ) self._client_secret = Adw.PasswordEntryRow( - title=self._plugin_base.lm.get("actions.base.twitch_client_secret")) + title=self._plugin_base.lm.get("actions.base.twitch_client_secret") + ) self._auth_button = Gtk.Button( - label=self._plugin_base.lm.get("actions.base.credentials.validate")) + label=self._plugin_base.lm.get("actions.base.credentials.validate") + ) self._auth_button.set_margin_top(10) self._auth_button.set_margin_bottom(10) self._client_id.connect("notify::text", self._on_change_client_id) - self._client_secret.connect( - "notify::text", self._on_change_client_secret) + self._client_secret.connect("notify::text", self._on_change_client_secret) self._auth_button.connect("clicked", self._on_auth_clicked) gh_link_label = self._plugin_base.lm.get("actions.info.link.label") gh_link_text = self._plugin_base.lm.get("actions.info.link.text") gh_label = Gtk.Label( - use_markup=True, label=f"{gh_link_label} {gh_link_text}") + use_markup=True, + label=f'{gh_link_label} {gh_link_text}', + ) self._load_settings() self._enable_auth() pref_group = Adw.PreferencesGroup() - pref_group.set_title(self._plugin_base.lm.get( - "actions.base.credentials.title")) + pref_group.set_title(self._plugin_base.lm.get("actions.base.credentials.title")) pref_group.add(self._status_label) pref_group.add(self._client_id) pref_group.add(self._client_secret) @@ -59,34 +74,34 @@ def get_settings_area(self) -> Adw.PreferencesGroup: pref_group.add(gh_label) return pref_group - def _load_settings(self): + def _load_settings(self) -> None: settings = self._plugin_base.get_settings() client_id = settings.get(KEY_CLIENT_ID, "") client_secret = settings.get(KEY_CLIENT_SECRET, "") self._client_id.set_text(client_id) self._client_secret.set_text(client_secret) - def _update_status(self, message: str, is_error: bool): + def _update_status(self, message: str, is_error: bool) -> None: style = "twitch-controller-red" if is_error else "twitch-controller-green" self._status_label.set_text(message) self._status_label.set_css_classes([style]) - def _update_settings(self, key: str, value: str): + def _update_settings(self, key: str, value: str) -> None: settings = self._plugin_base.get_settings() settings[key] = value self._plugin_base.set_settings(settings) - def _on_change_client_id(self, entry, _): + def _on_change_client_id(self, entry: Any, _: Any) -> None: val = entry.get_text().strip() self._update_settings(KEY_CLIENT_ID, val) self._enable_auth() - def _on_change_client_secret(self, entry, _): + def _on_change_client_secret(self, entry: Any, _: Any) -> None: val = entry.get_text().strip() self._update_settings(KEY_CLIENT_SECRET, val) self._enable_auth() - def _on_auth_clicked(self, _): + def _on_auth_clicked(self, _: Any) -> None: if not self._plugin_base.backend: self._update_status("Failed to load backend", True) return @@ -94,20 +109,17 @@ def _on_auth_clicked(self, _): client_id = settings.get(KEY_CLIENT_ID) client_secret = settings.get(KEY_CLIENT_SECRET) self._plugin_base.auth_callback_fn = self._on_auth_completed - self._plugin_base.backend.update_client_credentials( - client_id, client_secret) + self._plugin_base.backend.update_client_credentials(client_id, client_secret) - def _enable_auth(self): + def _enable_auth(self) -> None: settings = self._plugin_base.get_settings() client_secret = settings.get(KEY_CLIENT_SECRET, "") client_id = settings.get(KEY_CLIENT_ID, "") - self._auth_button.set_sensitive( - len(client_id) > 0 and len(client_secret) > 0) + self._auth_button.set_sensitive(len(client_id) > 0 and len(client_secret) > 0) - def _on_auth_completed(self, success: bool, message: str = ""): + def _on_auth_completed(self, success: bool, message: str = "") -> None: self._enable_auth() if not message: lm_key = "authenticated" if success else "failed" - message = self._plugin_base.lm.get( - f"actions.base.credentials.{lm_key}") + message = self._plugin_base.lm.get(f"actions.base.credentials.{lm_key}") self._update_status(message, not success) diff --git a/twitch_backend.py b/twitch_backend.py index a00bfd5..cabd74c 100644 --- a/twitch_backend.py +++ b/twitch_backend.py @@ -4,74 +4,178 @@ import threading import requests from datetime import datetime, timedelta +from collections import deque +from functools import wraps +from time import sleep +from typing import Callable, Any, Optional +from collections.abc import Sequence from loguru import logger as log from twitchpy.client import Client from streamcontroller_plugin_tools import BackendBase - -def make_handler(plugin_backend: 'Backend'): +from constants import ( + OAUTH_REDIRECT_URI, + OAUTH_PORT, + RATE_LIMIT_CALLS, + RATE_LIMIT_PERIOD, +) + + +class RateLimiter: + """Thread-safe rate limiter using a sliding window algorithm. + + This decorator limits the number of function calls within a specified time period. + It uses a sliding window approach where old calls outside the time window are + automatically removed, and new calls are blocked if the limit is reached. + + Args: + max_calls: Maximum number of calls allowed within the time period + period: Time period in seconds for the rate limit window + + Example: + @RateLimiter(max_calls=100, period=60) + def api_call(): + # This function will be limited to 100 calls per 60 seconds + pass + """ + + def __init__(self, max_calls: int, period: float) -> None: + self.max_calls: int = max_calls + self.period: float = period + self.calls: deque[datetime] = deque() + self.lock: threading.Lock = threading.Lock() + + def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + with self.lock: + now = datetime.now() + # Remove calls outside the time window + while ( + self.calls and (now - self.calls[0]).total_seconds() > self.period + ): + self.calls.popleft() + + # Check if we've hit the rate limit + if len(self.calls) >= self.max_calls: + # Calculate how long to wait + oldest_call = self.calls[0] + wait_time = self.period - (now - oldest_call).total_seconds() + if wait_time > 0: + log.warning( + f"Rate limit reached, waiting {wait_time:.2f} seconds" + ) + sleep(wait_time) + # Clean up old calls again after waiting + now = datetime.now() + while ( + self.calls + and (now - self.calls[0]).total_seconds() > self.period + ): + self.calls.popleft() + + # Record this call + self.calls.append(datetime.now()) + + return func(*args, **kwargs) + + return wrapper + + +def make_handler(plugin_backend: "Backend") -> type[BaseHTTPRequestHandler]: class AuthHandler(BaseHTTPRequestHandler): - def do_GET(self): - if not self.path.startswith('/auth'): + def do_GET(self) -> None: + if not self.path.startswith("/auth"): self.send_response(201) return url_parts = urlparse(self.path) query_params = parse_qs(url_parts.query) - if 'error' in query_params: - message = query_params['error_description'] if 'error_description' in query_params else 'Something went wrong!' + if "error" in query_params: + message = ( + query_params["error_description"] + if "error_description" in query_params + else "Something went wrong!" + ) status = 500 else: - message = 'Success! You may now close this browser window' + message = "Success! You may now close this browser window" status = 200 - shutdown = threading.Thread( - target=self.server.shutdown, daemon=True) + shutdown = threading.Thread(target=self.server.shutdown, daemon=True) shutdown.start() - self.protocol_version = 'HTTP/1.1' + self.protocol_version = "HTTP/1.1" self.send_response(status) - self.send_header('Content-Length', len(message)) + self.send_header("Content-Length", len(message)) self.end_headers() - self.wfile.write(bytes(message, 'utf8')) + self.wfile.write(bytes(message, "utf8")) if status != 200: plugin_backend.auth_failed() return - plugin_backend.new_code(query_params['code'][0]) + plugin_backend.new_code(query_params["code"][0]) return AuthHandler class Backend(BackendBase): - def __init__(self): + """Backend for Twitch API integration. + + Handles authentication, API calls, and rate limiting for Twitch operations. + All API methods are automatically rate-limited to prevent exceeding Twitch's + API limits. + """ + + def __init__(self) -> None: super().__init__() - self.twitch: Client = None - self.user_id: str = None - self.token_path: str = None - self.client_secret: str = None - self.client_id: str = None - self.httpd: HTTPServer = None - self.httpd_thread: threading.Thread = None - self.auth_code: str = None - self.cached_channels: dict = {} + self.twitch: Optional[Client] = None + self.user_id: Optional[str] = None + self.token_path: Optional[str] = None + self.client_secret: Optional[str] = None + self.client_id: Optional[str] = None + self.httpd: Optional[HTTPServer] = None + self.httpd_thread: Optional[threading.Thread] = None + self.auth_code: Optional[str] = None + self.cached_channels: dict[str, str] = {} + self.rate_limiter: RateLimiter = RateLimiter( + RATE_LIMIT_CALLS, RATE_LIMIT_PERIOD + ) def set_token_path(self, path: str) -> None: self.token_path = path - def on_disconnect(self, conn): + def on_disconnect(self, conn: Any) -> None: if self.httpd is not None: - self.httpd.shutdown() + try: + self.httpd.shutdown() + self.httpd.server_close() + except Exception as ex: + log.error(f"Error shutting down HTTP server: {ex}") + self.httpd = None + self.httpd_thread = None super().on_disconnect(conn) - def get_channel_id(self, user_name: str) -> str | None: + def get_channel_id(self, user_name: str) -> Optional[str]: + """Get Twitch channel ID from username. + + Args: + user_name: Twitch username to look up + + Returns: + Channel ID if found, None otherwise. Results are cached for performance. + """ if not user_name: - return + return None if user_name in self.cached_channels: return self.cached_channels[user_name] - users = self.twitch.get_users(None, [user_name]) + @self.rate_limiter + def _get_users() -> Sequence[Any]: + return self.twitch.get_users(None, [user_name]) + + users = _get_users() if users: channel_id = users[0].user_id self.cached_channels[user_name] = channel_id @@ -80,46 +184,77 @@ def get_channel_id(self, user_name: str) -> str | None: return None def create_clip(self) -> None: + """Create a clip of the current live stream.""" if not self.twitch: return self.validate_auth() - self.twitch.create_clip(self.user_id) + + @self.rate_limiter + def _create_clip() -> Any: + return self.twitch.create_clip(self.user_id) + + _create_clip() def create_marker(self) -> None: if not self.twitch: return self.validate_auth() - self.twitch.create_stream_marker(self.user_id) + + @self.rate_limiter + def _create_marker() -> Any: + return self.twitch.create_stream_marker(self.user_id) + + _create_marker() def get_viewers(self) -> str: if not self.twitch: - return + return "" self.validate_auth() - streams = self.twitch.get_streams(first=1, user_id=self.user_id) + + @self.rate_limiter + def _get_streams() -> Sequence[Any]: + return self.twitch.get_streams(first=1, user_id=self.user_id) + + streams = _get_streams() if not streams: - return 'Not Live' + return "Not Live" return str(streams[0].viewer_count) def toggle_chat_mode(self, mode: str) -> bool: if not self.twitch: - return + return False self.validate_auth() - current = self.twitch.get_chat_settings(self.user_id, self.user_id) + + @self.rate_limiter + def _get_settings() -> Any: + return self.twitch.get_chat_settings(self.user_id, self.user_id) + + @self.rate_limiter + def _update_settings(updated_value: bool) -> Any: + return self.twitch.update_chat_settings( + self.user_id, self.user_id, **{mode: updated_value} + ) + + current = _get_settings() updated = not getattr(current, mode) - self.twitch.update_chat_settings( - self.user_id, self.user_id, **{mode: updated}) + _update_settings(updated) return updated - def get_chat_settings(self) -> dict: + def get_chat_settings(self) -> dict[str, bool]: if not self.twitch: return {} self.validate_auth() - current = self.twitch.get_chat_settings(self.user_id, self.user_id) + + @self.rate_limiter + def _get_settings() -> Any: + return self.twitch.get_chat_settings(self.user_id, self.user_id) + + current = _get_settings() return { - 'subscriber_mode': current.subscriber_mode, - 'follower_mode': current.follower_mode, - 'emote_mode': current.emote_mode, - 'slow_mode': current.slow_mode + "subscriber_mode": current.subscriber_mode, + "follower_mode": current.follower_mode, + "emote_mode": current.emote_mode, + "slow_mode": current.slow_mode, } def send_message(self, message: str, user_name: str) -> None: @@ -127,25 +262,45 @@ def send_message(self, message: str, user_name: str) -> None: return self.validate_auth() channel_id = self.get_channel_id(user_name) or self.user_id - self.twitch.send_chat_message(channel_id, self.user_id, message) + + @self.rate_limiter + def _send_message() -> Any: + return self.twitch.send_chat_message(channel_id, self.user_id, message) + + _send_message() def snooze_ad(self) -> None: if not self.twitch: return self.validate_auth() - self.twitch.snooze_next_ad(self.user_id) + + @self.rate_limiter + def _snooze_ad() -> Any: + return self.twitch.snooze_next_ad(self.user_id) + + _snooze_ad() def play_ad(self, length: int) -> None: if not self.twitch: return self.validate_auth() - self.twitch.start_commercial(self.user_id, length) + + @self.rate_limiter + def _start_commercial() -> Any: + return self.twitch.start_commercial(self.user_id, length) + + _start_commercial() def get_next_ad(self) -> tuple[datetime, int]: if not self.twitch: return datetime.now() - timedelta(minutes=1), -1 self.validate_auth() - schedule = self.twitch.get_ad_schedule(self.user_id) + + @self.rate_limiter + def _get_ad_schedule() -> Any: + return self.twitch.get_ad_schedule(self.user_id) + + schedule = _get_ad_schedule() return schedule.next_ad_at, schedule.snooze_count def update_client_credentials(self, client_id: str, client_secret: str) -> None: @@ -154,51 +309,84 @@ def update_client_credentials(self, client_id: str, client_secret: str) -> None: self.client_id = client_id self.client_secret = client_secret params = { - 'client_id': client_id, - 'redirect_uri': 'http://localhost:3000/auth', - 'response_type': 'code', - 'scope': 'user:write:chat channel:manage:broadcast moderator:manage:chat_settings clips:edit channel:read:subscriptions channel:edit:commercial channel:manage:ads channel:read:ads' + "client_id": client_id, + "redirect_uri": OAUTH_REDIRECT_URI, + "response_type": "code", + "scope": "user:write:chat channel:manage:broadcast moderator:manage:chat_settings clips:edit channel:read:subscriptions channel:edit:commercial channel:manage:ads channel:read:ads", } encoded_params = urlencode(params) - if not self.httpd: - self.httpd = HTTPServer(('localhost', 3000), make_handler(self)) + + # Clean up existing server if it exists + if self.httpd is not None: + try: + self.httpd.shutdown() + self.httpd.server_close() + except Exception as ex: + log.error(f"Error shutting down existing HTTP server: {ex}") + + # Create new server + try: + self.httpd = HTTPServer(("localhost", OAUTH_PORT), make_handler(self)) + except Exception as ex: + log.error(f"Failed to create HTTP server on port {OAUTH_PORT}: {ex}") + self.auth_failed("Failed to start local authentication server") + return + + # Create and start server thread if not self.httpd_thread or not self.httpd_thread.is_alive(): self.httpd_thread = threading.Thread( - target=self.httpd.serve_forever, daemon=True) + target=self.httpd.serve_forever, daemon=True + ) if not self.httpd_thread.is_alive(): self.httpd_thread.start() - url = f'https://id.twitch.tv/oauth2/authorize?{encoded_params}' + url = f"https://id.twitch.tv/oauth2/authorize?{encoded_params}" check = requests.get(url, timeout=5) if check.status_code != 200: - message = check.json().get("message") if check.json() else "Incorrect Client ID" + message = ( + check.json().get("message") if check.json() else "Incorrect Client ID" + ) self.auth_failed(message) return - webbrowser.open( - f'https://id.twitch.tv/oauth2/authorize?{encoded_params}') + webbrowser.open(f"https://id.twitch.tv/oauth2/authorize?{encoded_params}") def new_code(self, auth_code: str) -> None: self.auth_with_code(self.client_id, self.client_secret, auth_code) def validate_auth(self) -> None: try: - _ = self.twitch.get_streams(first=1, user_id=self.user_id) + + @self.rate_limiter + def _validate() -> Sequence[Any]: + return self.twitch.get_streams(first=1, user_id=self.user_id) + + _ = _validate() except Exception as ex: - self.auth_with_code( - self.client_id, self.client_secret, self.auth_code) + self.auth_with_code(self.client_id, self.client_secret, self.auth_code) - def auth_with_code(self, client_id: str, client_secret: str, auth_code: str) -> None: + def auth_with_code( + self, client_id: str, client_secret: str, auth_code: str + ) -> None: try: - self.twitch = Client(client_id=client_id, client_secret=client_secret, - tokens_path=self.token_path, redirect_uri='http://localhost:3000/auth', authorization_code=auth_code) - users = self.twitch.get_users() + self.twitch = Client( + client_id=client_id, + client_secret=client_secret, + tokens_path=self.token_path, + redirect_uri=OAUTH_REDIRECT_URI, + authorization_code=auth_code, + ) + + @self.rate_limiter + def _get_users() -> Sequence[Any]: + return self.twitch.get_users() + + users = _get_users() self.auth_code = auth_code self.user_id = users[0].user_id self.client_id = client_id self.client_secret = client_secret - self.frontend.save_auth_settings( - client_id, client_secret, auth_code) + self.frontend.save_auth_settings(client_id, client_secret, auth_code) self.frontend.on_auth_callback(True) except Exception as e: log.error("failed to authenticate", e)