diff --git a/src/services/browser_captcha.py b/src/services/browser_captcha.py index 7ac57ca..64bf8a4 100644 --- a/src/services/browser_captcha.py +++ b/src/services/browser_captcha.py @@ -213,13 +213,14 @@ def _ensure_browser_installed() -> bool: # 代理解析工具函数 # ========================================== def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]: - """解析代理URL""" + """解析代理URL(支持 socks5h://,Playwright 中按 socks5 处理)""" if not proxy_url: return None - if not re.match(r'^(http|https|socks5)://', proxy_url): proxy_url = f"http://{proxy_url}" - match = re.match(r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url) + if not re.match(r'^(http|https|socks5h?|socks5)://', proxy_url): proxy_url = f"http://{proxy_url}" + match = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url) if match: protocol, username, password, host, port = match.groups() - proxy_config = {'server': f'{protocol}://{host}:{port}'} + browser_protocol = "socks5" if protocol.startswith("socks5") else protocol + proxy_config = {'server': f'{browser_protocol}://{host}:{port}'} if username and password: proxy_config['username'] = username proxy_config['password'] = password @@ -229,8 +230,8 @@ def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]: def normalize_browser_proxy_url(proxy_url: str) -> tuple[Optional[str], Optional[str]]: """将浏览器代理标准化为 Playwright/Chromium 可接受的格式。 - Chromium 不支持带账号密码的 socks5 代理认证。 - 对于 `socks5://user:pass@host:port`,自动降级为 `http://user:pass@host:port`, + Chromium 不支持带账号密码的 socks5/socks5h 代理认证。 + 对于 `socks5(h)://user:pass@host:port`,自动降级为 `http://user:pass@host:port`, 方便兼容同时提供 HTTP/SOCKS5 双入口的代理服务商。 Returns: @@ -240,27 +241,30 @@ def normalize_browser_proxy_url(proxy_url: str) -> tuple[Optional[str], Optional return None, None proxy_url = proxy_url.strip() - match = re.match(r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url) + match = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url) if not match: - if not re.match(r'^(http|https|socks5)://', proxy_url): + if not re.match(r'^(http|https|socks5h?|socks5)://', proxy_url): proxy_url = f"http://{proxy_url}" return proxy_url, None protocol, username, password, host, port = match.groups() - if protocol == "socks5" and username and password: + if protocol.startswith("socks5") and username and password: normalized = f"http://{username}:{password}@{host}:{port}" warning = ( - "检测到带认证的 SOCKS5 代理。" + f"检测到带认证的 {protocol.upper()} 代理。" "Chromium 不支持 socks5 用户名密码认证," f"已自动改用 HTTP 代理启动浏览器: http://{host}:{port}" ) return normalized, warning + if protocol == "socks5h": + proxy_url = f"socks5://{host}:{port}" + return proxy_url, None def validate_browser_proxy_url(proxy_url: str) -> tuple[bool, str]: if not proxy_url: return True, None - normalized_proxy_url, _ = normalize_browser_proxy_url(proxy_url) + normalized_proxy_url, _ = normalize_browser_proxy_url(proxy_url.strip()) parsed = parse_proxy_url(normalized_proxy_url) if not parsed: return False, "代理格式错误" return True, None diff --git a/src/services/browser_captcha_personal.py b/src/services/browser_captcha_personal.py index d1a1b29..794d0aa 100644 --- a/src/services/browser_captcha_personal.py +++ b/src/services/browser_captcha_personal.py @@ -8,6 +8,10 @@ import time import os import sys +import re +import json +import shutil +import tempfile import subprocess from typing import Optional, Dict, Any, Iterable @@ -139,6 +143,74 @@ def _ensure_nodriver_installed() -> bool: print(f"[BrowserCaptcha] ❌ nodriver 导入失败: {e}") +def _parse_proxy_url(proxy_url: str): + """Parse a proxy URL into (protocol, host, port, username, password).""" + if not proxy_url: + return None, None, None, None, None + url = proxy_url.strip() + if not re.match(r'^(http|https|socks5h?|socks5)://', url): + url = f"http://{url}" + m = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', url) + if not m: + return None, None, None, None, None + protocol, username, password, host, port = m.groups() + if protocol == "socks5h": + protocol = "socks5" + return protocol, host, port, username, password + + +def _create_proxy_auth_extension(protocol: str, host: str, port: str, username: str, password: str) -> str: + """Create a temporary Chrome extension directory for proxy authentication. + Returns the path to the extension directory.""" + ext_dir = tempfile.mkdtemp(prefix="nodriver_proxy_auth_") + + scheme_map = {"http": "http", "https": "https", "socks5": "socks5"} + scheme = scheme_map.get(protocol, "http") + + manifest = { + "version": "1.0.0", + "manifest_version": 2, + "name": "Proxy Auth Helper", + "permissions": [ + "proxy", "tabs", "unlimitedStorage", "storage", + "", "webRequest", "webRequestBlocking" + ], + "background": {"scripts": ["background.js"]}, + "minimum_chrome_version": "76.0.0" + } + background_js = ( + "var config = {\n" + ' mode: "fixed_servers",\n' + " rules: {\n" + " singleProxy: {\n" + f' scheme: "{scheme}",\n' + f' host: "{host}",\n' + f" port: parseInt({port})\n" + " },\n" + ' bypassList: ["localhost"]\n' + " }\n" + "};\n" + 'chrome.proxy.settings.set({value: config, scope: "regular"}, function(){});\n' + "chrome.webRequest.onAuthRequired.addListener(\n" + " function(details) {\n" + " return {\n" + " authCredentials: {\n" + f' username: "{username}",\n' + f' password: "{password}"\n' + " }\n" + " };\n" + " },\n" + ' {urls: [""]},\n' + " ['blocking']\n" + ");\n" + ) + with open(os.path.join(ext_dir, "manifest.json"), "w", encoding="utf-8") as f: + json.dump(manifest, f, indent=2) + with open(os.path.join(ext_dir, "background.js"), "w", encoding="utf-8") as f: + f.write(background_js) + return ext_dir + + class ResidentTabInfo: """常驻标签页信息结构""" def __init__(self, tab, slot_id: str, project_id: Optional[str] = None): @@ -197,6 +269,8 @@ def __init__(self, db=None): self._recaptcha_ready = False # 向后兼容 self._last_fingerprint: Optional[Dict[str, Any]] = None self._resident_error_streaks: dict[str, int] = {} + self._proxy_url: Optional[str] = None + self._proxy_ext_dir: Optional[str] = None # 自定义站点打码常驻页(用于 score-test) self._custom_tabs: dict[str, Dict[str, Any]] = {} self._custom_lock = asyncio.Lock() @@ -615,6 +689,8 @@ async def _shutdown_browser_runtime_locked(self, reason: str): self.browser = None self._initialized = False self._last_fingerprint = None + self._cleanup_proxy_extension() + self._proxy_url = None async with self._resident_lock: resident_items = list(self._resident_tabs.values()) @@ -652,6 +728,40 @@ async def close_once(tab): f"[BrowserCaptcha] 停止浏览器实例失败 ({reason}): {e}" ) + async def _resolve_personal_proxy(self): + """Read proxy config for personal captcha browser. + Priority: captcha browser_proxy > request proxy.""" + if not self.db: + return None, None, None, None, None + try: + captcha_cfg = await self.db.get_captcha_config() + if captcha_cfg.browser_proxy_enabled and captcha_cfg.browser_proxy_url: + url = captcha_cfg.browser_proxy_url.strip() + if url: + debug_logger.log_info(f"[BrowserCaptcha] Personal 使用验证码代理: {url}") + return _parse_proxy_url(url) + except Exception as e: + debug_logger.log_warning(f"[BrowserCaptcha] 读取验证码代理配置失败: {e}") + try: + proxy_cfg = await self.db.get_proxy_config() + if proxy_cfg and proxy_cfg.enabled and proxy_cfg.proxy_url: + url = proxy_cfg.proxy_url.strip() + if url: + debug_logger.log_info(f"[BrowserCaptcha] Personal 回退使用请求代理: {url}") + return _parse_proxy_url(url) + except Exception as e: + debug_logger.log_warning(f"[BrowserCaptcha] 读取请求代理配置失败: {e}") + return None, None, None, None, None + + def _cleanup_proxy_extension(self): + """Remove temporary proxy auth extension directory.""" + if self._proxy_ext_dir and os.path.isdir(self._proxy_ext_dir): + try: + shutil.rmtree(self._proxy_ext_dir, ignore_errors=True) + except Exception: + pass + self._proxy_ext_dir = None + async def initialize(self): """初始化 nodriver 浏览器""" self._check_available() @@ -690,27 +800,49 @@ async def initialize(self): f"[BrowserCaptcha] 使用指定浏览器可执行文件: {browser_executable_path}" ) + # 解析代理配置 + self._cleanup_proxy_extension() + self._proxy_url = None + protocol, host, port, username, password = await self._resolve_personal_proxy() + proxy_server_arg = None + if protocol and host and port: + if username and password: + self._proxy_ext_dir = _create_proxy_auth_extension(protocol, host, port, username, password) + debug_logger.log_info( + f"[BrowserCaptcha] Personal 代理需要认证,已创建扩展: {self._proxy_ext_dir}" + ) + proxy_server_arg = f"--proxy-server={protocol}://{host}:{port}" + self._proxy_url = f"{protocol}://{host}:{port}" + debug_logger.log_info(f"[BrowserCaptcha] Personal 浏览器代理: {self._proxy_url}") + + browser_args = [ + '--disable-dev-shm-usage', + '--disable-setuid-sandbox', + '--disable-gpu', + '--window-size=1280,720', + '--window-position=3000,3000', + '--profile-directory=Default', + '--disable-background-networking', + '--disable-sync', + '--disable-translate', + '--disable-default-apps', + '--no-first-run', + '--no-default-browser-check', + ] + if proxy_server_arg: + browser_args.append(proxy_server_arg) + if self._proxy_ext_dir: + browser_args.append(f'--load-extension={self._proxy_ext_dir}') + else: + browser_args.append('--disable-extensions') + # 启动 nodriver 浏览器(后台启动,不占用前台) config = uc.Config( headless=self.headless, user_data_dir=self.user_data_dir, browser_executable_path=browser_executable_path, sandbox=False, - browser_args=[ - '--disable-dev-shm-usage', - '--disable-setuid-sandbox', - '--disable-gpu', - '--window-size=1280,720', - '--window-position=3000,3000', # 窗口位置移到屏幕外 - '--profile-directory=Default', - '--disable-extensions', - '--disable-background-networking', - '--disable-sync', - '--disable-translate', - '--disable-default-apps', - '--no-first-run', - '--no-default-browser-check', - ] + browser_args=browser_args, ) self.browser = await self._run_with_timeout( uc.start(config), @@ -1491,8 +1623,7 @@ async def _extract_tab_fingerprint(self, tab) -> Optional[Dict[str, Any]]: if not isinstance(fingerprint, dict): return None - # personal 模式当前未单独配置浏览器代理,显式使用直连,避免与全局代理混淆。 - result: Dict[str, Any] = {"proxy_url": None} + result: Dict[str, Any] = {"proxy_url": self._proxy_url} for key in ("user_agent", "accept_language", "sec_ch_ua", "sec_ch_ua_mobile", "sec_ch_ua_platform"): value = fingerprint.get(key) if isinstance(value, str) and value: @@ -2212,7 +2343,7 @@ async def get_custom_token( extracted_fingerprint = { "user_agent": fallback_ua or "", "accept_language": fallback_lang or "", - "proxy_url": None, + "proxy_url": self._proxy_url, } except Exception: extracted_fingerprint = None diff --git a/src/services/proxy_manager.py b/src/services/proxy_manager.py index 6ea5cb6..8d0caa7 100644 --- a/src/services/proxy_manager.py +++ b/src/services/proxy_manager.py @@ -47,10 +47,6 @@ def _parse_proxy_line(self, line: str) -> Optional[str]: # 协议前缀格式 if line.startswith(("http://", "https://", "socks5://", "socks5h://")): - # socks5h 统一转 socks5,便于后续处理 - if line.startswith("socks5h://"): - line = "socks5://" + line[len("socks5h://"):] - # 已是标准 user:pass@host:port(或 host:port) if "@" in line: return line