Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions src/services/browser_captcha.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,14 @@ def _ensure_browser_installed() -> bool:
# 代理解析工具函数
# ==========================================
def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]:
"""解析代理URL"""
"""解析代理URL(支持 socks5h://,Playwright 中按 socks5 处理)"""
if not proxy_url: return None
if not re.match(r'^(http|https|socks5)://', proxy_url): proxy_url = f"http://{proxy_url}"
match = re.match(r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url)
if not re.match(r'^(http|https|socks5h?|socks5)://', proxy_url): proxy_url = f"http://{proxy_url}"
match = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url)
if match:
protocol, username, password, host, port = match.groups()
proxy_config = {'server': f'{protocol}://{host}:{port}'}
browser_protocol = "socks5" if protocol.startswith("socks5") else protocol
proxy_config = {'server': f'{browser_protocol}://{host}:{port}'}
if username and password:
proxy_config['username'] = username
proxy_config['password'] = password
Expand All @@ -229,8 +230,8 @@ def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]:
def normalize_browser_proxy_url(proxy_url: str) -> tuple[Optional[str], Optional[str]]:
"""将浏览器代理标准化为 Playwright/Chromium 可接受的格式。

Chromium 不支持带账号密码的 socks5 代理认证。
对于 `socks5://user:pass@host:port`,自动降级为 `http://user:pass@host:port`,
Chromium 不支持带账号密码的 socks5/socks5h 代理认证。
对于 `socks5(h)://user:pass@host:port`,自动降级为 `http://user:pass@host:port`,
方便兼容同时提供 HTTP/SOCKS5 双入口的代理服务商。

Returns:
Expand All @@ -240,27 +241,30 @@ def normalize_browser_proxy_url(proxy_url: str) -> tuple[Optional[str], Optional
return None, None

proxy_url = proxy_url.strip()
match = re.match(r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url)
match = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url)
if not match:
if not re.match(r'^(http|https|socks5)://', proxy_url):
if not re.match(r'^(http|https|socks5h?|socks5)://', proxy_url):
proxy_url = f"http://{proxy_url}"
return proxy_url, None

protocol, username, password, host, port = match.groups()
if protocol == "socks5" and username and password:
if protocol.startswith("socks5") and username and password:
normalized = f"http://{username}:{password}@{host}:{port}"
warning = (
"检测到带认证的 SOCKS5 代理。"
f"检测到带认证的 {protocol.upper()} 代理。"
"Chromium 不支持 socks5 用户名密码认证,"
f"已自动改用 HTTP 代理启动浏览器: http://{host}:{port}"
)
return normalized, warning

if protocol == "socks5h":
proxy_url = f"socks5://{host}:{port}"

return proxy_url, None

def validate_browser_proxy_url(proxy_url: str) -> tuple[bool, str]:
if not proxy_url: return True, None
normalized_proxy_url, _ = normalize_browser_proxy_url(proxy_url)
normalized_proxy_url, _ = normalize_browser_proxy_url(proxy_url.strip())
parsed = parse_proxy_url(normalized_proxy_url)
if not parsed: return False, "代理格式错误"
return True, None
Expand Down
167 changes: 149 additions & 18 deletions src/services/browser_captcha_personal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
import time
import os
import sys
import re
import json
import shutil
import tempfile
import subprocess
from typing import Optional, Dict, Any, Iterable

Expand Down Expand Up @@ -139,6 +143,74 @@ def _ensure_nodriver_installed() -> bool:
print(f"[BrowserCaptcha] ❌ nodriver 导入失败: {e}")


def _parse_proxy_url(proxy_url: str):
"""Parse a proxy URL into (protocol, host, port, username, password)."""
if not proxy_url:
return None, None, None, None, None
url = proxy_url.strip()
if not re.match(r'^(http|https|socks5h?|socks5)://', url):
url = f"http://{url}"
m = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', url)
if not m:
return None, None, None, None, None
protocol, username, password, host, port = m.groups()
if protocol == "socks5h":
protocol = "socks5"
return protocol, host, port, username, password


def _create_proxy_auth_extension(protocol: str, host: str, port: str, username: str, password: str) -> str:
"""Create a temporary Chrome extension directory for proxy authentication.
Returns the path to the extension directory."""
ext_dir = tempfile.mkdtemp(prefix="nodriver_proxy_auth_")

scheme_map = {"http": "http", "https": "https", "socks5": "socks5"}
scheme = scheme_map.get(protocol, "http")

manifest = {
"version": "1.0.0",
"manifest_version": 2,
"name": "Proxy Auth Helper",
"permissions": [
"proxy", "tabs", "unlimitedStorage", "storage",
"<all_urls>", "webRequest", "webRequestBlocking"
],
"background": {"scripts": ["background.js"]},
"minimum_chrome_version": "76.0.0"
}
background_js = (
"var config = {\n"
' mode: "fixed_servers",\n'
" rules: {\n"
" singleProxy: {\n"
f' scheme: "{scheme}",\n'
f' host: "{host}",\n'
f" port: parseInt({port})\n"
" },\n"
' bypassList: ["localhost"]\n'
" }\n"
"};\n"
'chrome.proxy.settings.set({value: config, scope: "regular"}, function(){});\n'
"chrome.webRequest.onAuthRequired.addListener(\n"
" function(details) {\n"
" return {\n"
" authCredentials: {\n"
f' username: "{username}",\n'
f' password: "{password}"\n'
" }\n"
" };\n"
" },\n"
' {urls: ["<all_urls>"]},\n'
" ['blocking']\n"
");\n"
)
with open(os.path.join(ext_dir, "manifest.json"), "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2)
with open(os.path.join(ext_dir, "background.js"), "w", encoding="utf-8") as f:
f.write(background_js)
return ext_dir


class ResidentTabInfo:
"""常驻标签页信息结构"""
def __init__(self, tab, slot_id: str, project_id: Optional[str] = None):
Expand Down Expand Up @@ -197,6 +269,8 @@ def __init__(self, db=None):
self._recaptcha_ready = False # 向后兼容
self._last_fingerprint: Optional[Dict[str, Any]] = None
self._resident_error_streaks: dict[str, int] = {}
self._proxy_url: Optional[str] = None
self._proxy_ext_dir: Optional[str] = None
# 自定义站点打码常驻页(用于 score-test)
self._custom_tabs: dict[str, Dict[str, Any]] = {}
self._custom_lock = asyncio.Lock()
Expand Down Expand Up @@ -615,6 +689,8 @@ async def _shutdown_browser_runtime_locked(self, reason: str):
self.browser = None
self._initialized = False
self._last_fingerprint = None
self._cleanup_proxy_extension()
self._proxy_url = None

async with self._resident_lock:
resident_items = list(self._resident_tabs.values())
Expand Down Expand Up @@ -652,6 +728,40 @@ async def close_once(tab):
f"[BrowserCaptcha] 停止浏览器实例失败 ({reason}): {e}"
)

async def _resolve_personal_proxy(self):
"""Read proxy config for personal captcha browser.
Priority: captcha browser_proxy > request proxy."""
if not self.db:
return None, None, None, None, None
try:
captcha_cfg = await self.db.get_captcha_config()
if captcha_cfg.browser_proxy_enabled and captcha_cfg.browser_proxy_url:
url = captcha_cfg.browser_proxy_url.strip()
if url:
debug_logger.log_info(f"[BrowserCaptcha] Personal 使用验证码代理: {url}")
return _parse_proxy_url(url)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 读取验证码代理配置失败: {e}")
try:
proxy_cfg = await self.db.get_proxy_config()
if proxy_cfg and proxy_cfg.enabled and proxy_cfg.proxy_url:
url = proxy_cfg.proxy_url.strip()
if url:
debug_logger.log_info(f"[BrowserCaptcha] Personal 回退使用请求代理: {url}")
return _parse_proxy_url(url)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 读取请求代理配置失败: {e}")
return None, None, None, None, None

def _cleanup_proxy_extension(self):
"""Remove temporary proxy auth extension directory."""
if self._proxy_ext_dir and os.path.isdir(self._proxy_ext_dir):
try:
shutil.rmtree(self._proxy_ext_dir, ignore_errors=True)
except Exception:
pass
self._proxy_ext_dir = None

async def initialize(self):
"""初始化 nodriver 浏览器"""
self._check_available()
Expand Down Expand Up @@ -690,27 +800,49 @@ async def initialize(self):
f"[BrowserCaptcha] 使用指定浏览器可执行文件: {browser_executable_path}"
)

# 解析代理配置
self._cleanup_proxy_extension()
self._proxy_url = None
protocol, host, port, username, password = await self._resolve_personal_proxy()
proxy_server_arg = None
if protocol and host and port:
if username and password:
self._proxy_ext_dir = _create_proxy_auth_extension(protocol, host, port, username, password)
debug_logger.log_info(
f"[BrowserCaptcha] Personal 代理需要认证,已创建扩展: {self._proxy_ext_dir}"
)
proxy_server_arg = f"--proxy-server={protocol}://{host}:{port}"
self._proxy_url = f"{protocol}://{host}:{port}"
debug_logger.log_info(f"[BrowserCaptcha] Personal 浏览器代理: {self._proxy_url}")

browser_args = [
'--disable-dev-shm-usage',
'--disable-setuid-sandbox',
'--disable-gpu',
'--window-size=1280,720',
'--window-position=3000,3000',
'--profile-directory=Default',
'--disable-background-networking',
'--disable-sync',
'--disable-translate',
'--disable-default-apps',
'--no-first-run',
'--no-default-browser-check',
]
if proxy_server_arg:
browser_args.append(proxy_server_arg)
if self._proxy_ext_dir:
browser_args.append(f'--load-extension={self._proxy_ext_dir}')
else:
browser_args.append('--disable-extensions')

# 启动 nodriver 浏览器(后台启动,不占用前台)
config = uc.Config(
headless=self.headless,
user_data_dir=self.user_data_dir,
browser_executable_path=browser_executable_path,
sandbox=False,
browser_args=[
'--disable-dev-shm-usage',
'--disable-setuid-sandbox',
'--disable-gpu',
'--window-size=1280,720',
'--window-position=3000,3000', # 窗口位置移到屏幕外
'--profile-directory=Default',
'--disable-extensions',
'--disable-background-networking',
'--disable-sync',
'--disable-translate',
'--disable-default-apps',
'--no-first-run',
'--no-default-browser-check',
]
browser_args=browser_args,
)
self.browser = await self._run_with_timeout(
uc.start(config),
Expand Down Expand Up @@ -1491,8 +1623,7 @@ async def _extract_tab_fingerprint(self, tab) -> Optional[Dict[str, Any]]:
if not isinstance(fingerprint, dict):
return None

# personal 模式当前未单独配置浏览器代理,显式使用直连,避免与全局代理混淆。
result: Dict[str, Any] = {"proxy_url": None}
result: Dict[str, Any] = {"proxy_url": self._proxy_url}
for key in ("user_agent", "accept_language", "sec_ch_ua", "sec_ch_ua_mobile", "sec_ch_ua_platform"):
value = fingerprint.get(key)
if isinstance(value, str) and value:
Expand Down Expand Up @@ -2212,7 +2343,7 @@ async def get_custom_token(
extracted_fingerprint = {
"user_agent": fallback_ua or "",
"accept_language": fallback_lang or "",
"proxy_url": None,
"proxy_url": self._proxy_url,
}
except Exception:
extracted_fingerprint = None
Expand Down
4 changes: 0 additions & 4 deletions src/services/proxy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ def _parse_proxy_line(self, line: str) -> Optional[str]:

# 协议前缀格式
if line.startswith(("http://", "https://", "socks5://", "socks5h://")):
# socks5h 统一转 socks5,便于后续处理
if line.startswith("socks5h://"):
line = "socks5://" + line[len("socks5h://"):]

# 已是标准 user:pass@host:port(或 host:port)
if "@" in line:
return line
Expand Down