diff --git a/.gitignore b/.gitignore
index 09cf776..5f5986c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -272,3 +272,4 @@ conf/*
.DS_Store
/.claude/
/apps/worker/conf/
+apps/worker/celerybeat-schedule.db
diff --git a/CLAUDE.md b/CLAUDE.md
index 09eb0e0..4f0b52f 100644
--- a/CLAUDE.md
+++ b/CLAUDE.md
@@ -33,7 +33,7 @@ The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`).
- **`main.py`** — FastAPI app setup, Sentry integration, lifecycle management
- **`config.py`** — Environment variable handling, platform credentials
- **`routers/`** — `scraper.py` (generic endpoint), `scraper_routers.py` (platform-specific), `inoreader.py`, `wechat.py`
-- **`services/scrapers/`** — `scraper_manager.py` orchestrates platform scrapers (twitter, weibo, bluesky, xiaohongshu, reddit, instagram, zhihu, douban, threads, wechat, general)
+- **`services/scrapers/`** — `scraper_manager.py` orchestrates platform scrapers (twitter, weibo, bluesky, xiaohongshu, reddit, instagram, zhihu, douban, threads, wechat, general); the Xiaohongshu scraper uses `xiaohongshu/adaptar.py` (`XhsSinglePostAdapter`) with an external sign server instead of the old Playwright-based crawler
- **`services/file_export/`** — PDF generation, audio transcription (OpenAI), video download
- **`services/amazon/s3.py`** — S3 storage integration
- **`services/telegraph/`** — Telegraph content publishing
@@ -50,7 +50,7 @@ The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`).
### Shared Library (`packages/shared/fastfetchbot_shared/`)
-- **`config.py`** — URL patterns (SOCIAL_MEDIA_WEBSITE_PATTERNS, VIDEO_WEBSITE_PATTERNS, BANNED_PATTERNS)
+- **`config.py`** — URL patterns (SOCIAL_MEDIA_WEBSITE_PATTERNS, VIDEO_WEBSITE_PATTERNS, BANNED_PATTERNS); shared env vars including `SIGN_SERVER_URL` and `XHS_COOKIE_PATH`
- **`models/`** — `classes.py` (NamedBytesIO), `metadata_item.py`, `telegraph_item.py`, `url_metadata.py`
- **`utils/`** — `parse.py` (URL parsing, HTML processing, `get_env_bool`), `image.py`, `logger.py`, `network.py`
@@ -128,6 +128,8 @@ See `template.env` for a complete reference. Key variables:
- Most scrapers require authentication cookies/tokens
- Use browser extension "Get cookies.txt LOCALLY" to extract cookies
- Store Zhihu cookies in `conf/zhihu_cookies.json`
+- Store Xiaohongshu cookies in `conf/xhs_cookies.txt` (single-line cookie string, e.g. `a1=x; web_id=x; web_session=x`)
+- Xiaohongshu also requires an external **sign server** reachable at `SIGN_SERVER_URL` (default `http://localhost:8989`); the sign server is currently closed-source — you must supply your own compatible implementation
- See `template.env` for all platform-specific variables (Twitter, Weibo, Xiaohongshu, Reddit, Instagram, Bluesky, etc.)
### Database
diff --git a/README.md b/README.md
index a0c0a49..83c037a 100644
--- a/README.md
+++ b/README.md
@@ -154,10 +154,46 @@ See `template.env` for a complete reference with comments.
| Twitter | `TWITTER_CT0`, `TWITTER_AUTH_TOKEN` |
| Reddit | `REDDIT_CLIENT_ID`, `REDDIT_CLIENT_SECRET`, `REDDIT_USERNAME`, `REDDIT_PASSWORD` |
| Weibo | `WEIBO_COOKIES` |
-| Xiaohongshu | `XIAOHONGSHU_A1`, `XIAOHONGSHU_WEBID`, `XIAOHONGSHU_WEBSESSION` |
+| Xiaohongshu | See [Xiaohongshu Setup](#xiaohongshu-setup) below |
| Instagram | `X_RAPIDAPI_KEY` |
| Zhihu | Store cookies in `conf/zhihu_cookies.json` |
+#### Xiaohongshu Setup
+
+Xiaohongshu (XHS) API requests require a cryptographic signature (`x-s`, `x-t`, etc.) that must be computed by a dedicated signing proxy. FastFetchBot delegates this to an external **sign server**.
+
+> **Note:** We currently use a closed-source sign server. You will need to run your own compatible signing proxy and point `SIGN_SERVER_URL` at it.
+
+The sign server must accept `POST /signsrv/v1/xhs/sign` with a JSON body:
+
+```json
+{"uri": "/api/sns/web/v1/feed", "data": {...}, "cookies": "a1=..."}
+```
+
+and return:
+
+```json
+{"isok": true, "data": {"x_s": "...", "x_t": "...", "x_s_common": "...", "x_b3_traceid": "..."}}
+```
+
+**Cookie configuration** (two options; file takes priority):
+
+- **File (recommended):** Create `apps/api/conf/xhs_cookies.txt` containing your XHS cookies as a single line:
+ ```
+ a1=xxxxxxxx; web_id=xxxxxxxx; web_session=xxxxxxxx
+ ```
+ Log in to [xiaohongshu.com](https://www.xiaohongshu.com) in your browser, then copy the cookie values from DevTools → Application → Cookies, or use the [Get cookies.txt LOCALLY](https://chrome.google.com/webstore/detail/get-cookiestxt-locally/cclelndahbckbenkjhflpdbgdldlbecc) extension.
+
+- **Environment variables (legacy fallback):** Set `XIAOHONGSHU_A1`, `XIAOHONGSHU_WEBID`, and `XIAOHONGSHU_WEBSESSION` individually. Used only when the cookie file is absent.
+
+| Variable | Default | Description |
+|----------|---------|-------------|
+| `SIGN_SERVER_URL` | `http://localhost:8989` | URL of the XHS signing proxy |
+| `XHS_COOKIE_PATH` | `conf/xhs_cookies.txt` | Path to cookie file (overrides default location) |
+| `XIAOHONGSHU_A1` | `None` | `a1` cookie value (legacy fallback) |
+| `XIAOHONGSHU_WEBID` | `None` | `web_id` cookie value (legacy fallback) |
+| `XIAOHONGSHU_WEBSESSION` | `None` | `web_session` cookie value (legacy fallback) |
+
#### Cloud Services
| Variable | Description |
@@ -193,7 +229,7 @@ See `template.env` for a complete reference with comments.
- [x] WeChat Public Account Articles
- [x] Zhihu
- [x] Douban
-- [ ] Xiaohongshu
+- [x] Xiaohongshu
### Video
@@ -211,7 +247,7 @@ The GitHub Actions pipeline (`.github/workflows/ci.yml`) automatically builds an
The HTML to Telegra.ph converter function is based on [html-telegraph-poster](https://github.com/mercuree/html-telegraph-poster). I separated it from this project as an independent Python package: [html-telegraph-poster-v2](https://github.com/aturret/html-telegraph-poster-v2).
-The Xiaohongshu scraper is based on [MediaCrawler](https://github.com/NanmiCoder/MediaCrawler).
+The original Xiaohongshu scraper was based on [MediaCrawler](https://github.com/NanmiCoder/MediaCrawler). The current implementation uses a custom httpx-based adapter with an external signing proxy.
The Weibo scraper is based on [weiboSpider](https://github.com/dataabc/weiboSpider).
diff --git a/apps/api/src/config.py b/apps/api/src/config.py
index e02c3af..4da6b3b 100644
--- a/apps/api/src/config.py
+++ b/apps/api/src/config.py
@@ -6,6 +6,7 @@
import gettext
import secrets
+from fastfetchbot_shared.utils.logger import logger
from fastfetchbot_shared.utils.parse import get_env_bool
env = os.environ
@@ -89,6 +90,32 @@
XHS_ENABLE_IP_PROXY = get_env_bool(env, "XHS_ENABLE_IP_PROXY", False)
XHS_SAVE_LOGIN_STATE = get_env_bool(env, "XHS_SAVE_LOGIN_STATE", True)
+# XHS sign server and cookie file
+from fastfetchbot_shared.config import SIGN_SERVER_URL as XHS_SIGN_SERVER_URL
+from fastfetchbot_shared.config import XHS_COOKIE_PATH as _XHS_COOKIE_PATH
+
+xhs_cookie_path = _XHS_COOKIE_PATH or os.path.join(conf_dir, "xhs_cookies.txt")
+
+# Load XHS cookies from file (similar to Zhihu cookie loading)
+XHS_COOKIE_STRING = ""
+if os.path.exists(xhs_cookie_path):
+ try:
+ with open(xhs_cookie_path, "r", encoding="utf-8") as f:
+ XHS_COOKIE_STRING = f.read().strip()
+ except (IOError, OSError) as e:
+ logger.error(f"Error reading XHS cookie file: {e}")
+ XHS_COOKIE_STRING = ""
+else:
+ # Fallback: build cookie string from individual env vars (backward compat)
+ cookie_parts = []
+ if XIAOHONGSHU_A1:
+ cookie_parts.append(f"a1={XIAOHONGSHU_A1}")
+ if XIAOHONGSHU_WEBID:
+ cookie_parts.append(f"web_id={XIAOHONGSHU_WEBID}")
+ if XIAOHONGSHU_WEBSESSION:
+ cookie_parts.append(f"web_session={XIAOHONGSHU_WEBSESSION}")
+ XHS_COOKIE_STRING = "; ".join(cookie_parts)
+
# Zhihu
FXZHIHU_HOST = env.get("FXZHIHU_HOST", "fxzhihu.com")
diff --git a/apps/api/src/services/scrapers/xiaohongshu/__init__.py b/apps/api/src/services/scrapers/xiaohongshu/__init__.py
index 6225a35..bd66cea 100644
--- a/apps/api/src/services/scrapers/xiaohongshu/__init__.py
+++ b/apps/api/src/services/scrapers/xiaohongshu/__init__.py
@@ -1,23 +1,14 @@
-import asyncio
from typing import Any
-from urllib.parse import urlparse
-
-import httpx
-import jmespath
from fastfetchbot_shared.models.metadata_item import MetadataItem, MediaFile, MessageType
-from fastfetchbot_shared.utils.network import HEADERS
-from src.config import JINJA2_ENV, HTTP_REQUEST_TIMEOUT
-from .xhs.core import XiaoHongShuCrawler
-from .xhs.client import XHSClient
-from .xhs import proxy_account_pool
-
from fastfetchbot_shared.utils.logger import logger
from fastfetchbot_shared.utils.parse import (
unix_timestamp_to_utc,
get_html_text_length,
wrap_text_into_html,
)
+from src.config import JINJA2_ENV, XHS_COOKIE_STRING, XHS_SIGN_SERVER_URL
+from .adaptar import XhsSinglePostAdapter
environment = JINJA2_ENV
short_text_template = environment.get_template("xiaohongshu_short_text.jinja2")
@@ -42,78 +33,51 @@ def __init__(self, url: str, data: Any, **kwargs):
self.raw_content = None
async def get_item(self) -> dict:
- await self.get_xiaohongshu()
+ await self._get_xiaohongshu()
return self.to_dict()
- async def get_xiaohongshu(self) -> None:
- if self.url.find("xiaohongshu.com") == -1:
- async with httpx.AsyncClient() as client:
- resp = await client.get(
- self.url,
- headers=HEADERS,
- follow_redirects=True,
- timeout=HTTP_REQUEST_TIMEOUT,
- )
- if (
- resp.history
- ): # if there is a redirect, the request will have a response chain
- for h in resp.history:
- print(h.status_code, h.url)
- self.url = str(resp.url)
- urlparser = urlparse(self.url)
- self.id = urlparser.path.split("/")[-1]
- crawler = XiaoHongShuCrawler()
- account_pool = proxy_account_pool.create_account_pool()
- crawler.init_config("xhs", "cookie", account_pool)
- note_detail = None
- for _ in range(5):
- try:
- note_detail = await crawler.start(id=self.id)
- break
- except Exception as e:
- await asyncio.sleep(3)
- logger.error(f"error: {e}")
- logger.error(f"retrying...")
- if not note_detail:
- raise Exception("重试了这么多次还是无法签名成功,寄寄寄")
- # logger.debug(f"json_data: {json.dumps(note_detail, ensure_ascii=False, indent=4)}")
- parsed_data = self.process_note_json(note_detail)
- await self.process_xiaohongshu_note(parsed_data)
+ async def _get_xiaohongshu(self) -> None:
+ async with XhsSinglePostAdapter(
+ cookies=XHS_COOKIE_STRING,
+ sign_server_endpoint=XHS_SIGN_SERVER_URL,
+ ) as adapter:
+ result = await adapter.fetch_post(note_url=self.url)
+ note = result["note"]
+ self.id = note.get("note_id")
+ self.url = result["url"]
+ await self._process_xiaohongshu_note(note)
- async def process_xiaohongshu_note(self, json_data: dict):
+ async def _process_xiaohongshu_note(self, json_data: dict):
+ user = json_data.get("user", {}) or {}
self.title = json_data.get("title")
- self.author = json_data.get("author")
+ self.author = user.get("nickname")
if not self.title and self.author:
self.title = f"{self.author}的小红书笔记"
- self.author_url = "https://www.xiaohongshu.com/user/profile/" + json_data.get(
- "user_id"
+ self.author_url = (
+ "https://www.xiaohongshu.com/user/profile/" + user.get("user_id", "")
)
- self.raw_content = json_data.get("raw_content")
- logger.debug(f"{json_data.get('created')}")
+ self.raw_content = json_data.get("desc", "")
+ raw_time = json_data.get("time", 0)
+ raw_updated = json_data.get("last_update_time", 0)
self.created = (
- unix_timestamp_to_utc(json_data.get("created") / 1000)
- if json_data.get("created")
- else None
+ unix_timestamp_to_utc(int(raw_time) / 1000) if raw_time else None
)
self.updated = (
- unix_timestamp_to_utc(json_data.get("updated") / 1000)
- if json_data.get("updated")
- else None
+ unix_timestamp_to_utc(int(raw_updated) / 1000) if raw_updated else None
)
- self.like_count = json_data.get("like_count")
+ self.like_count = json_data.get("liked_count")
self.collected_count = json_data.get("collected_count")
self.comment_count = json_data.get("comment_count")
self.share_count = json_data.get("share_count")
self.ip_location = json_data.get("ip_location")
- if json_data.get("image_list"):
- for image_url in json_data.get("image_list"):
- self.media_files.append(MediaFile(url=image_url, media_type="image"))
- if json_data.get("video"):
- self.media_files.append(
- MediaFile(url=json_data.get("video"), media_type="video")
- )
+ for image_url in json_data.get("image_list", []) or []:
+ self.media_files.append(MediaFile(url=image_url, media_type="image"))
+ video_urls = json_data.get("video_urls", []) or []
+ if video_urls:
+ self.media_files.append(MediaFile(url=video_urls[0], media_type="video"))
data = self.__dict__
- data["raw_content"] = data["raw_content"].replace("\t", "")
+ raw_content = self.raw_content or ""
+ data["raw_content"] = raw_content.replace("\t", "")
if data["raw_content"].endswith("\n"):
data["raw_content"] = data["raw_content"][:-1]
self.text = short_text_template.render(data=data)
@@ -124,30 +88,7 @@ async def process_xiaohongshu_note(self, json_data: dict):
if media_file.media_type == "image":
data["raw_content"] += f'

'
elif media_file.media_type == "video":
- data[
- "raw_content"
- ] += (
+ data["raw_content"] += (
f''
)
self.content = content_template.render(data=data)
-
- @staticmethod
- def process_note_json(json_data: dict):
- expression = """
- {
- title: title,
- raw_content: desc,
- author: user.nickname,
- user_id: user.user_id,
- image_list: image_list[*].url,
- video: video.media.stream.h264[0].master_url,
- like_count: interact_info.liked_count,
- collected_count: interact_info.collected_count,
- comment_count: interact_info.comment_count,
- share_count: interact_info.share_count,
- ip_location: ip_location,
- created: time,
- updated: last_update_time
- }
- """
- return jmespath.search(expression, json_data)
diff --git a/apps/api/src/services/scrapers/xiaohongshu/adaptar.py b/apps/api/src/services/scrapers/xiaohongshu/adaptar.py
new file mode 100644
index 0000000..a3f53ae
--- /dev/null
+++ b/apps/api/src/services/scrapers/xiaohongshu/adaptar.py
@@ -0,0 +1,542 @@
+from __future__ import annotations
+
+import json
+import re
+from typing import Any, Dict, List, Optional
+from urllib.parse import parse_qsl, urlencode, urlparse
+
+import httpx
+
+from fastfetchbot_shared.config import SIGN_SERVER_URL
+from fastfetchbot_shared.utils.logger import logger
+
+XHS_API_URL = "https://edith.xiaohongshu.com"
+XHS_WEB_URL = "https://www.xiaohongshu.com"
+
+
+def parse_xhs_note_url(note_url: str) -> Dict[str, str]:
+ """
+ Parse XHS note URL into note_id/xsec_token/xsec_source.
+ """
+ parsed = urlparse(note_url)
+ query = dict(parse_qsl(parsed.query, keep_blank_values=True))
+ path_parts = [part for part in parsed.path.split("/") if part]
+ if not path_parts:
+ raise ValueError(f"Invalid XHS note URL: {note_url}")
+ note_id = path_parts[-1]
+ if note_id in {"explore", "discovery", "item"}:
+ raise ValueError(f"Invalid XHS note URL path: {note_url}")
+ return {
+ "note_id": note_id,
+ "xsec_token": query.get("xsec_token", ""),
+ "xsec_source": query.get("xsec_source", ""),
+ }
+
+def get_pure_url(url: str) -> str:
+ """
+ Get the pure URL without query parameters or fragment.
+ """
+ parsed = urlparse(url)
+ return parsed.scheme + "://" + parsed.netloc + parsed.path
+
+
+class XhsSinglePostAdapter:
+ """Small adapter for fetching one Xiaohongshu post and optional comments."""
+
+ def __init__(
+ self,
+ cookies: str,
+ sign_server_endpoint: str = "",
+ timeout: float = 20.0,
+ ):
+ self.cookies = cookies.strip()
+ self.sign_server_endpoint = (sign_server_endpoint or SIGN_SERVER_URL).rstrip("/")
+ if not self.sign_server_endpoint:
+ raise ValueError(
+ "XhsSinglePostAdapter requires a sign server URL. "
+ "Set SIGN_SERVER_URL in the environment or pass sign_server_endpoint explicitly."
+ )
+ self.timeout = timeout
+ self._http = httpx.AsyncClient(timeout=timeout, follow_redirects=True)
+
+ async def close(self) -> None:
+ await self._http.aclose()
+
+ async def __aenter__(self) -> "XhsSinglePostAdapter":
+ return self
+
+ async def __aexit__(self, exc_type, exc, tb) -> None: # type: ignore[override]
+ await self.close()
+
+ def _base_headers(self) -> Dict[str, str]:
+ return {
+ "accept": "application/json, text/plain, */*",
+ "accept-language": "zh-CN,zh;q=0.9",
+ "cache-control": "no-cache",
+ "content-type": "application/json;charset=UTF-8",
+ "origin": XHS_WEB_URL,
+ "pragma": "no-cache",
+ "referer": f"{XHS_WEB_URL}/",
+ "sec-ch-ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
+ "sec-ch-ua-mobile": "?0",
+ "sec-ch-ua-platform": '"Windows"',
+ "sec-fetch-dest": "empty",
+ "sec-fetch-mode": "cors",
+ "sec-fetch-site": "same-site",
+ "user-agent": (
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ "Chrome/137.0.0.0 Safari/537.36"
+ ),
+ "cookie": self.cookies,
+ }
+
+ async def _sign_headers(self, uri: str, data: Optional[Any] = None) -> Dict[str, str]:
+ payload = {"uri": uri, "data": data, "cookies": self.cookies}
+ resp = await self._http.post(
+ f"{self.sign_server_endpoint}/signsrv/v1/xhs/sign",
+ json=payload,
+ )
+ resp.raise_for_status()
+ body = resp.json()
+ if not body.get("isok"):
+ raise RuntimeError(f"XHS sign server returned error: {body}")
+ sign = body.get("data", {}) or {}
+ required = ["x_s", "x_t", "x_s_common", "x_b3_traceid"]
+ missing = [key for key in required if key not in sign]
+ if missing:
+ raise RuntimeError(f"XHS sign response missing fields: {missing}")
+ headers = self._base_headers()
+ headers.update(
+ {
+ "X-s": sign["x_s"],
+ "X-t": sign["x_t"],
+ "x-s-common": sign["x_s_common"],
+ "X-B3-Traceid": sign["x_b3_traceid"],
+ }
+ )
+ return headers
+
+ async def fetch_post(
+ self,
+ note_url: str,
+ with_comments: bool = False,
+ max_comments: int = 0,
+ include_sub_comments: bool = False,
+ ) -> Dict[str, Any]:
+ """
+ Fetch one XHS post by full URL (recommended with xsec_token + xsec_source).
+ """
+ if not note_url.startswith(XHS_WEB_URL):
+ note_url = await self._get_redirection_url(note_url)
+ url_info = parse_xhs_note_url(note_url)
+ try:
+ note = await self._fetch_note_by_api(
+ note_id=url_info["note_id"],
+ xsec_token=url_info["xsec_token"],
+ xsec_source=url_info["xsec_source"],
+ )
+ except Exception as e:
+ logger.error(f"Failed to fetch note by API: {url_info['note_id']}, error: {e}")
+ note = None
+
+ if note is None:
+ note = await self._fetch_note_by_html(
+ note_id=url_info["note_id"],
+ xsec_token=url_info["xsec_token"],
+ xsec_source=url_info["xsec_source"],
+ )
+ if note is None:
+ raise RuntimeError(f"Cannot fetch note detail from API or HTML: {url_info['note_id']}")
+
+ result: Dict[str, Any] = {"platform": "xhs", "note": note, "comments": [], "url": get_pure_url(note_url)}
+ if with_comments:
+ try:
+ result["comments"] = await self._fetch_comments(
+ note_id=url_info["note_id"],
+ xsec_token=url_info["xsec_token"],
+ max_comments=max_comments,
+ include_sub_comments=include_sub_comments,
+ )
+ except Exception as e:
+ logger.error(f"Failed to fetch comments for note {url_info['note_id']}, error: {e}")
+ result["comments"] = []
+ return result
+
+ async def _signed_post(self, uri: str, data: Dict[str, Any]) -> Dict[str, Any]:
+ headers = await self._sign_headers(uri=uri, data=data)
+ json_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
+ resp = await self._http.post(
+ f"{XHS_API_URL}{uri}",
+ content=json_str.encode("utf-8"),
+ headers=headers,
+ )
+ return self._parse_api_response(resp)
+
+ async def _signed_get(self, uri: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
+ params = params or {}
+ final_uri = uri
+ if params:
+ final_uri = f"{uri}?{urlencode(params, doseq=True)}"
+ headers = await self._sign_headers(uri=final_uri)
+ resp = await self._http.get(
+ f"{XHS_API_URL}{uri}",
+ params=params,
+ headers=headers,
+ )
+ return self._parse_api_response(resp)
+
+ @staticmethod
+ def _parse_api_response(resp: httpx.Response) -> Dict[str, Any]:
+ try:
+ body = resp.json()
+ except json.JSONDecodeError as exc:
+ raise RuntimeError(f"XHS API returned non-JSON: status={resp.status_code}") from exc
+
+ if resp.status_code in (461, 471):
+ verify_type = resp.headers.get("Verifytype", "")
+ verify_uuid = resp.headers.get("Verifyuuid", "")
+ raise RuntimeError(
+ f"XHS blocked by captcha: verify_type={verify_type}, verify_uuid={verify_uuid}"
+ )
+
+ if body.get("success"):
+ return body.get("data", {}) or {}
+ raise RuntimeError(f"XHS API error: status={resp.status_code}, body={body}")
+
+ async def _fetch_note_by_api(
+ self,
+ note_id: str,
+ xsec_token: str,
+ xsec_source: str,
+ ) -> Optional[Dict[str, Any]]:
+ data: Dict[str, Any] = {
+ "source_note_id": note_id,
+ "image_formats": ["jpg", "webp", "avif"],
+ "extra": {"need_body_topic": 1},
+ }
+ if xsec_token:
+ data["xsec_token"] = xsec_token
+ data["xsec_source"] = xsec_source
+
+ payload = await self._signed_post("/api/sns/web/v1/feed", data=data)
+ items = payload.get("items", []) or []
+ if not items:
+ return None
+ card = (items[0] or {}).get("note_card", {})
+ if not card:
+ return None
+ card["xsec_token"] = xsec_token
+ card["xsec_source"] = xsec_source
+ return self._normalize_note(card)
+
+ async def _fetch_note_by_html(
+ self,
+ note_id: str,
+ xsec_token: str,
+ xsec_source: str,
+ ) -> Optional[Dict[str, Any]]:
+ detail_url = f"{XHS_WEB_URL}/explore/{note_id}"
+ if xsec_token:
+ detail_url += f"?xsec_token={xsec_token}&xsec_source={xsec_source}"
+
+ resp = await self._http.get(detail_url, headers=self._base_headers())
+ if resp.status_code != 200:
+ return None
+
+ text = resp.text or ""
+ match = re.search(r"window.__INITIAL_STATE__=({.*})", text)
+ if not match:
+ return None
+
+ try:
+ state = json.loads(match.group(1).replace("undefined", "null"))
+ note = (
+ state.get("note", {})
+ .get("noteDetailMap", {})
+ .get(note_id, {})
+ .get("note")
+ )
+ except json.JSONDecodeError:
+ return None
+
+ if not note:
+ return None
+ note["xsec_token"] = xsec_token
+ note["xsec_source"] = xsec_source
+ return self._normalize_note(note)
+
+ async def _fetch_comments(
+ self,
+ note_id: str,
+ xsec_token: str = "",
+ max_comments: int = 0,
+ include_sub_comments: bool = False,
+ ) -> List[Dict[str, Any]]:
+ comments: List[Dict[str, Any]] = []
+ cursor = ""
+ has_more = True
+
+ while has_more:
+ params: Dict[str, Any] = {
+ "note_id": note_id,
+ "cursor": cursor,
+ "top_comment_id": "",
+ "image_formats": "jpg,webp,avif",
+ }
+ if xsec_token:
+ params["xsec_token"] = xsec_token
+
+ payload = await self._signed_get("/api/sns/web/v2/comment/page", params=params)
+ raw_comments = payload.get("comments", []) or []
+ for item in raw_comments:
+ comments.append(
+ self._normalize_comment(
+ note_id=note_id,
+ note_xsec_token=xsec_token,
+ raw=item,
+ root_comment_id="",
+ )
+ )
+ if include_sub_comments:
+ comments.extend(
+ await self._fetch_sub_comments(
+ note_id=note_id,
+ root_comment=item,
+ xsec_token=xsec_token,
+ )
+ )
+ if max_comments > 0 and len(comments) >= max_comments:
+ return comments[:max_comments]
+
+ has_more = bool(payload.get("has_more", False))
+ cursor = str(payload.get("cursor", "") or "")
+ if not cursor and has_more:
+ break
+
+ return comments
+
+ async def _fetch_sub_comments(
+ self,
+ note_id: str,
+ root_comment: Dict[str, Any],
+ xsec_token: str,
+ ) -> List[Dict[str, Any]]:
+ results: List[Dict[str, Any]] = []
+ inline_sub_comments = root_comment.get("sub_comments", []) or []
+ root_comment_id = str(root_comment.get("id", ""))
+
+ for item in inline_sub_comments:
+ results.append(
+ self._normalize_comment(
+ note_id=note_id,
+ note_xsec_token=xsec_token,
+ raw=item,
+ root_comment_id=root_comment_id,
+ )
+ )
+
+ has_more = bool(root_comment.get("sub_comment_has_more", False))
+ cursor = str(root_comment.get("sub_comment_cursor", "") or "")
+ while has_more:
+ params: Dict[str, Any] = {
+ "note_id": note_id,
+ "root_comment_id": root_comment_id,
+ "num": 10,
+ "cursor": cursor,
+ }
+ if xsec_token:
+ params["xsec_token"] = xsec_token
+ payload = await self._signed_get("/api/sns/web/v2/comment/sub/page", params=params)
+ sub_comments = payload.get("comments", []) or []
+ for item in sub_comments:
+ results.append(
+ self._normalize_comment(
+ note_id=note_id,
+ note_xsec_token=xsec_token,
+ raw=item,
+ root_comment_id=root_comment_id,
+ )
+ )
+ has_more = bool(payload.get("has_more", False))
+ cursor = str(payload.get("cursor", "") or "")
+ if not cursor and has_more:
+ break
+ return results
+
+ def _normalize_note(self, note_item: Dict[str, Any]) -> Dict[str, Any]:
+ def _pick(data: Dict[str, Any], *keys: str, default: Any = None) -> Any:
+ if not isinstance(data, dict):
+ return default
+ for key in keys:
+ if key in data and data.get(key) is not None:
+ return data.get(key)
+ return default
+
+ def _to_int(value: Any) -> int:
+ try:
+ return int(value or 0)
+ except (TypeError, ValueError):
+ return 0
+
+ note_type = str(_pick(note_item, "type", default=""))
+ user = _pick(note_item, "user", default={}) or {}
+ interact = _pick(note_item, "interact_info", "interactInfo", default={}) or {}
+ image_list = []
+ if note_type != "video":
+ for item in _pick(note_item, "image_list", "imageList", default=[]) or []:
+ url = (
+ _pick(item, "url_default", "urlDefault", "url")
+ if isinstance(item, dict)
+ else None
+ )
+ if url:
+ image_list.append(str(url))
+
+ tag_list = []
+ for tag in _pick(note_item, "tag_list", "tagList", default=[]) or []:
+ if not isinstance(tag, dict):
+ continue
+ if tag.get("type") == "topic" and tag.get("name"):
+ tag_list.append(str(tag["name"]))
+
+ video_urls: List[str] = self._extract_video_urls(note_item)
+ if not video_urls and isinstance(note_item.get("note"), dict):
+ # Some payloads wrap the note body under `note`.
+ video_urls = self._extract_video_urls(note_item["note"])
+ note_id = str(_pick(note_item, "note_id", "noteId", default=""))
+ xsec_token = str(_pick(note_item, "xsec_token", "xsecToken", default=""))
+ xsec_source = str(
+ _pick(note_item, "xsec_source", "xsecSource", default="pc_search")
+ or "pc_search"
+ )
+ note_url = (
+ f"{XHS_WEB_URL}/explore/{note_id}"
+ f"?xsec_token={xsec_token}&xsec_source={xsec_source}"
+ )
+
+ return {
+ "note_id": note_id,
+ "type": note_type,
+ "title": str(
+ _pick(note_item, "title", default="")
+ or str(_pick(note_item, "desc", default=""))[:255]
+ ),
+ "desc": str(_pick(note_item, "desc", default="")),
+ "video_urls": video_urls,
+ "time": str(_pick(note_item, "time", default="")),
+ "last_update_time": str(
+ _pick(note_item, "last_update_time", "lastUpdateTime", default="")
+ ),
+ "ip_location": str(_pick(note_item, "ip_location", "ipLocation", default="")),
+ "image_list": image_list,
+ "tag_list": tag_list,
+ "url": note_url,
+ "note_url": note_url,
+ "liked_count": _to_int(_pick(interact, "liked_count", "likedCount", default=0)),
+ "collected_count": _to_int(
+ _pick(interact, "collected_count", "collectedCount", default=0)
+ ),
+ "comment_count": _to_int(
+ _pick(interact, "comment_count", "commentCount", default=0)
+ ),
+ "share_count": _to_int(_pick(interact, "share_count", "shareCount", default=0)),
+ "user": {
+ "user_id": str(_pick(user, "user_id", "userId", default="")),
+ "nickname": str(_pick(user, "nickname", default="")),
+ "avatar": str(_pick(user, "avatar", "image", default="")),
+ },
+ }
+
+ @staticmethod
+ def _extract_video_urls(note_item: Dict[str, Any]) -> List[str]:
+ note_type = str(note_item.get("type", "") or "")
+ video = note_item.get("video", {}) or {}
+ if note_type != "video" and not isinstance(video, dict):
+ return []
+
+ consumer = video.get("consumer", {}) or {}
+ origin_video_key = consumer.get("origin_video_key") or consumer.get("originVideoKey")
+ if origin_video_key:
+ return [f"http://sns-video-bd.xhscdn.com/{origin_video_key}"]
+
+ urls: List[str] = []
+ stream = (video.get("media", {}) or {}).get("stream", {}) or {}
+ # Prefer broadly compatible streams first, then fallback to newer codecs.
+ for stream_key in ("h264", "h265", "av1", "h266"):
+ for item in stream.get(stream_key, []) or []:
+ if not isinstance(item, dict):
+ continue
+ master_url = item.get("master_url") or item.get("masterUrl")
+ if master_url:
+ urls.append(str(master_url))
+ for backup_url in item.get("backup_urls", []) or item.get("backupUrls", []) or []:
+ if backup_url:
+ urls.append(str(backup_url))
+
+ # Keep ordering while deduplicating.
+ seen = set()
+ deduped: List[str] = []
+ for url in urls:
+ if url not in seen:
+ seen.add(url)
+ deduped.append(url)
+ return deduped
+
+ def _normalize_comment(
+ self,
+ note_id: str,
+ note_xsec_token: str,
+ raw: Dict[str, Any],
+ root_comment_id: str = "",
+ ) -> Dict[str, Any]:
+ user = raw.get("user_info", {}) or {}
+ target = raw.get("target_comment", {}) or {}
+ pics = []
+ for item in raw.get("pictures", []) or []:
+ url = item.get("url_default")
+ if url:
+ pics.append(url)
+
+ return {
+ "comment_id": str(raw.get("id", "")),
+ "parent_comment_id": root_comment_id,
+ "target_comment_id": str(target.get("id", "")),
+ "note_id": note_id,
+ "content": str(raw.get("content", "")),
+ "create_time": str(raw.get("create_time", "")),
+ "ip_location": str(raw.get("ip_location", "")),
+ "sub_comment_count": int(raw.get("sub_comment_count", 0) or 0),
+ "like_count": int(raw.get("like_count", 0) or 0),
+ "pictures": pics,
+ "note_url": (
+ f"{XHS_WEB_URL}/explore/{note_id}"
+ f"?xsec_token={note_xsec_token}&xsec_source=pc_search"
+ ),
+ "user": {
+ "user_id": str(user.get("user_id", "")),
+ "nickname": str(user.get("nickname", "")),
+ "avatar": str(user.get("image", "")),
+ },
+ }
+
+ async def _get_redirection_url(self, note_url: str) -> str:
+ """Follow redirects from a short URL (e.g. xhslink.com) to the full xiaohongshu.com URL."""
+ redirect_headers = {
+ "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
+ "accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
+ "user-agent": (
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ "Chrome/137.0.0.0 Safari/537.36"
+ ),
+ }
+ async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True) as client:
+ resp = await client.get(note_url, headers=redirect_headers)
+
+ final_url = str(resp.url)
+ if XHS_WEB_URL not in final_url and "xiaohongshu.com" not in final_url:
+ raise RuntimeError(
+ f"Short URL did not redirect to xiaohongshu.com: {note_url} -> {final_url}"
+ )
+ return final_url
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/__init__.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/__init__.py
deleted file mode 100644
index 947f034..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/__init__.py
+++ /dev/null
@@ -1,2 +0,0 @@
-from .core import XiaoHongShuCrawler
-from .field import *
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/base_crawler.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/base_crawler.py
deleted file mode 100644
index d00b38a..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/base_crawler.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from abc import ABC, abstractmethod
-
-from .proxy_account_pool import AccountPool
-
-
-class AbstractCrawler(ABC):
- @abstractmethod
- def init_config(self, platform: str, login_type: str, account_pool: AccountPool):
- pass
-
- @abstractmethod
- async def start(self):
- pass
-
- @abstractmethod
- async def search(self):
- pass
-
-
-class AbstractLogin(ABC):
- @abstractmethod
- async def begin(self):
- pass
-
- @abstractmethod
- async def login_by_qrcode(self):
- pass
-
- @abstractmethod
- async def login_by_mobile(self):
- pass
-
- @abstractmethod
- async def login_by_cookies(self):
- pass
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/client.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/client.py
deleted file mode 100644
index 367ba6a..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/client.py
+++ /dev/null
@@ -1,217 +0,0 @@
-import asyncio
-import json
-from typing import Dict
-
-import httpx
-from playwright.async_api import BrowserContext, Page
-
-from fastfetchbot_shared.utils.logger import logger
-from .exception import DataFetchError, IPBlockError
-from .field import SearchNoteType, SearchSortType
-from .help import get_search_id, sign
-from . import utils
-
-
-class XHSClient:
- def __init__(
- self,
- timeout=10,
- proxies=None,
- *,
- headers: Dict[str, str],
- playwright_page: Page,
- cookie_dict: Dict[str, str],
- ):
- self.proxies = proxies
- self.timeout = timeout
- self.headers = headers
- self._host = "https://edith.xiaohongshu.com"
- self.IP_ERROR_STR = "网络连接异常,请检查网络设置或重启试试"
- self.IP_ERROR_CODE = 300012
- self.NOTE_ABNORMAL_STR = "笔记状态异常,请稍后查看"
- self.NOTE_ABNORMAL_CODE = -510001
- self.playwright_page = playwright_page
- self.cookie_dict = cookie_dict
-
- async def _pre_headers(self, url: str, data=None):
- encrypt_params = await self.playwright_page.evaluate(
- "([url, data]) => window._webmsxyw(url,data)", [url, data]
- )
- local_storage = await self.playwright_page.evaluate("() => window.localStorage")
- signs = sign(
- a1=self.cookie_dict.get("a1", ""),
- b1=local_storage.get("b1", ""),
- x_s=encrypt_params.get("X-s", ""),
- x_t=str(encrypt_params.get("X-t", "")),
- )
-
- headers = {
- "X-S": signs["x-s"],
- "X-T": signs["x-t"],
- "x-S-Common": signs["x-s-common"],
- "X-B3-Traceid": signs["x-b3-traceid"],
- }
- self.headers.update(headers)
- return self.headers
-
- async def request(self, method, url, **kwargs) -> Dict:
- async with httpx.AsyncClient(proxies=self.proxies) as client:
- response = await client.request(method, url, timeout=self.timeout, **kwargs)
- data: Dict = response.json()
- if data["success"]:
- return data.get("data", data.get("success", {}))
- elif data["code"] == self.IP_ERROR_CODE:
- raise IPBlockError(self.IP_ERROR_STR)
- else:
- raise DataFetchError(data.get("msg", None))
-
- async def get(self, uri: str, params=None) -> Dict:
- final_uri = uri
- if isinstance(params, dict):
- final_uri = f"{uri}?" f"{'&'.join([f'{k}={v}' for k, v in params.items()])}"
- headers = await self._pre_headers(final_uri)
- return await self.request(
- method="GET", url=f"{self._host}{final_uri}", headers=headers
- )
-
- async def post(self, uri: str, data: dict) -> Dict:
- headers = await self._pre_headers(uri, data)
- json_str = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
- return await self.request(
- method="POST", url=f"{self._host}{uri}", data=json_str, headers=headers
- )
-
- async def ping(self) -> bool:
- """get a note to check if login state is ok"""
- logger.info("Begin to ping xhs...")
- ping_flag = False
- try:
- note_card: Dict = await self.get_note_by_keyword(keyword="小红书")
- if note_card.get("items"):
- ping_flag = True
- except Exception as e:
- logger.error(f"Ping xhs failed: {e}, and try to login again...")
- ping_flag = False
- return ping_flag
-
- async def update_cookies(self, browser_context: BrowserContext):
- cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
- self.headers["Cookie"] = cookie_str
- self.cookie_dict = cookie_dict
-
- async def get_note_by_keyword(
- self,
- keyword: str,
- page: int = 1,
- page_size: int = 20,
- sort: SearchSortType = SearchSortType.GENERAL,
- note_type: SearchNoteType = SearchNoteType.ALL,
- ) -> Dict:
- """search note by keyword
-
- :param keyword: what notes you want to search
- :param page: page number, defaults to 1
- :param page_size: page size, defaults to 20
- :param sort: sort ordering, defaults to SearchSortType.GENERAL
- :param note_type: note type, defaults to SearchNoteType.ALL
- :return: {has_more: true, items: []}
- """
- uri = "/api/sns/web/v1/search/notes"
- data = {
- "keyword": keyword,
- "page": page,
- "page_size": page_size,
- "search_id": get_search_id(),
- "sort": sort.value,
- "note_type": note_type.value,
- }
- return await self.post(uri, data)
-
- async def get_note_by_id(self, note_id: str) -> Dict:
- """
- :param note_id: note_id you want to fetch
- :return: {"time":1679019883000,"user":{"nickname":"nickname","avatar":"avatar","user_id":"user_id"},"image_list":[{"url":"https://sns-img-qc.xhscdn.com/c8e505ca-4e5f-44be-fe1c-ca0205a38bad","trace_id":"1000g00826s57r6cfu0005ossb1e9gk8c65d0c80","file_id":"c8e505ca-4e5f-44be-fe1c-ca0205a38bad","height":1920,"width":1440}],"tag_list":[{"id":"5be78cdfdb601f000100d0bc","name":"jk","type":"topic"}],"desc":"裙裙","interact_info":{"followed":false,"liked":false,"liked_count":"1732","collected":false,"collected_count":"453","comment_count":"30","share_count":"41"},"at_user_list":[],"last_update_time":1679019884000,"note_id":"6413cf6b00000000270115b5","type":"normal","title":"title"}
- """
- data = {"source_note_id": note_id}
- uri = "/api/sns/web/v1/feed"
- res = await self.post(uri, data)
- res_dict: Dict = res["items"][0]["note_card"]
- return res_dict
-
- async def get_note_comments(self, note_id: str, cursor: str = "") -> Dict:
- """get note comments
- :param note_id: note id you want to fetch
- :param cursor: last you get cursor, defaults to ""
- :return: {"has_more": true,"cursor": "6422442d000000000700dcdb",comments: [],"user_id": "63273a77000000002303cc9b","time": 1681566542930}
- """
- uri = "/api/sns/web/v2/comment/page"
- params = {"note_id": note_id, "cursor": cursor}
- return await self.get(uri, params)
-
- async def get_note_sub_comments(
- self, note_id: str, root_comment_id: str, num: int = 30, cursor: str = ""
- ):
- """
- get note sub comments
- :param note_id: note id you want to fetch
- :param root_comment_id: parent comment id
- :param num: recommend 30, if num greater 30, it only return 30 comments
- :param cursor: last you get cursor, defaults to ""
- :return: {"has_more": true,"cursor": "6422442d000000000700dcdb",comments: [],"user_id": "63273a77000000002303cc9b","time": 1681566542930}
- """
- uri = "/api/sns/web/v2/comment/sub/page"
- params = {
- "note_id": note_id,
- "root_comment_id": root_comment_id,
- "num": num,
- "cursor": cursor,
- }
- return await self.get(uri, params)
-
- async def get_note_all_comments(
- self, note_id: str, crawl_interval: float = 1.0, is_fetch_sub_comments=False
- ):
- """
- get note all comments include sub comments
- :param note_id:
- :param crawl_interval:
- :param is_fetch_sub_comments:
- :return:
- """
-
- result = []
- comments_has_more = True
- comments_cursor = ""
- while comments_has_more:
- comments_res = await self.get_note_comments(note_id, comments_cursor)
- comments_has_more = comments_res.get("has_more", False)
- comments_cursor = comments_res.get("cursor", "")
- comments = comments_res["comments"]
- if not is_fetch_sub_comments:
- result.extend(comments)
- continue
- # handle get sub comments
- for comment in comments:
- result.append(comment)
- cur_sub_comment_count = int(comment["sub_comment_count"])
- cur_sub_comments = comment["sub_comments"]
- result.extend(cur_sub_comments)
- sub_comments_has_more = (
- comment["sub_comment_has_more"]
- and len(cur_sub_comments) < cur_sub_comment_count
- )
- sub_comment_cursor = comment["sub_comment_cursor"]
- while sub_comments_has_more:
- page_num = 30
- sub_comments_res = await self.get_note_sub_comments(
- note_id, comment["id"], num=page_num, cursor=sub_comment_cursor
- )
- sub_comments = sub_comments_res["comments"]
- sub_comments_has_more = (
- sub_comments_res["has_more"] and len(sub_comments) == page_num
- )
- sub_comment_cursor = sub_comments_res["cursor"]
- result.extend(sub_comments)
- await asyncio.sleep(crawl_interval)
- await asyncio.sleep(crawl_interval)
- return result
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/core.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/core.py
deleted file mode 100644
index 8e450a7..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/core.py
+++ /dev/null
@@ -1,225 +0,0 @@
-import asyncio
-import random
-from asyncio import Task
-from typing import Dict, List, Optional, Tuple
-
-from playwright.async_api import BrowserContext, BrowserType, Page, async_playwright
-
-from .base_crawler import AbstractCrawler
-from .proxy_account_pool import AccountPool
-
-# import xiaohongshu as xhs_model
-from fastfetchbot_shared.utils.logger import logger
-from src import config
-
-# from var import request_keyword_var
-from .client import XHSClient
-from .exception import DataFetchError
-from .login import XHSLogin
-from . import utils
-
-
-class XiaoHongShuCrawler(AbstractCrawler):
- platform: str
- login_type: str
- context_page: Page
- xhs_client: XHSClient
- account_pool: AccountPool
- browser_context: BrowserContext
-
- def __init__(self) -> None:
- self.index_url = "https://www.xiaohongshu.com"
- self.user_agent = utils.get_user_agent()
-
- def init_config(
- self, platform: str, login_type: str, account_pool: AccountPool
- ) -> None:
- self.platform = platform
- self.login_type = login_type
- self.account_pool = account_pool
-
- async def start(self, id: str) -> dict:
- account_phone, playwright_proxy, httpx_proxy = self.create_proxy_info()
- async with async_playwright() as playwright:
- # Launch a browser context.
- chromium = playwright.chromium
- self.browser_context = await self.launch_browser(
- chromium, playwright_proxy, self.user_agent, headless=True
- )
- # stealth.min.js is a js script to prevent the website from detecting the crawler.
- await self.browser_context.add_init_script(path="app/utils/stealth.min.js")
- # add a cookie attribute webId to avoid the appearance of a sliding captcha on the webpage
- await self.browser_context.add_cookies(
- [
- {
- "name": "webId",
- "value": "xxx123", # any value
- "domain": ".xiaohongshu.com",
- "path": "/",
- }
- ]
- )
- await asyncio.sleep(1)
- self.context_page = await self.browser_context.new_page()
- await asyncio.sleep(1)
- await self.context_page.goto(self.index_url)
- await asyncio.sleep(1)
- # Create a client to interact with the xiaohongshu website.
- self.xhs_client = await self.create_xhs_client(httpx_proxy)
- if not await self.xhs_client.ping():
- login_obj = XHSLogin(
- login_type=self.login_type,
- login_phone=account_phone,
- browser_context=self.browser_context,
- context_page=self.context_page,
- cookie_str=config.XIAOHONGSHU_COOKIES,
- )
- await asyncio.sleep(1)
- await login_obj.begin()
- await asyncio.sleep(1)
- await self.xhs_client.update_cookies(
- browser_context=self.browser_context
- )
-
- # Search for notes and retrieve their comment information.
- # await self.search()
-
- logger.info("Xhs Crawler finished ...")
- return await self.xhs_client.get_note_by_id(id)
-
- async def search(self) -> None:
- """Search for notes and retrieve their comment information."""
- logger.info("Begin search xiaohongshu keywords")
- xhs_limit_count = 20 # xhs limit page fixed value
- for keyword in config.KEYWORDS.split(","):
- # set keyword to context var
- # request_keyword_var.set(keyword)
- logger.info(f"Current search keyword: {keyword}")
- page = 1
- while page * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
- note_id_list: List[str] = []
- notes_res = await self.xhs_client.get_note_by_keyword(
- keyword=keyword,
- page=page,
- )
- semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
- task_list = [
- self.get_note_detail(post_item.get("id"), semaphore)
- for post_item in notes_res.get("items", {})
- if post_item.get("model_type") not in ("rec_query", "hot_query")
- ]
- note_details = await asyncio.gather(*task_list)
- for note_detail in note_details:
- if note_detail is not None:
- # await xhs_model.update_xhs_note(note_detail)
- note_id_list.append(note_detail.get("note_id"))
- page += 1
- logger.info(f"Note details: {note_details}")
- await self.batch_get_note_comments(note_id_list)
-
- async def get_note_detail(
- self, note_id: str, semaphore: asyncio.Semaphore
- ) -> Optional[Dict]:
- """Get note detail"""
- async with semaphore:
- try:
- return await self.xhs_client.get_note_by_id(note_id)
- except DataFetchError as ex:
- logger.error(f"Get note detail error: {ex}")
- return None
-
- async def batch_get_note_comments(self, note_list: List[str]):
- """Batch get note comments"""
- logger.info(f"Begin batch get note comments, note list: {note_list}")
- semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
- task_list: List[Task] = []
- for note_id in note_list:
- task = asyncio.create_task(
- self.get_comments(note_id, semaphore), name=note_id
- )
- task_list.append(task)
- await asyncio.gather(*task_list)
-
- async def get_comments(self, note_id: str, semaphore: asyncio.Semaphore):
- """Get note comments"""
- async with semaphore:
- logger.info(f"Begin get note id comments {note_id}")
- all_comments = await self.xhs_client.get_note_all_comments(
- note_id=note_id, crawl_interval=random.random()
- )
- # for comment in all_comments:
- # await xhs_model.update_xhs_note_comment(
- # note_id=note_id, comment_item=comment
- # )
-
- def create_proxy_info(self) -> Tuple[Optional[str], Optional[Dict], Optional[str]]:
- """Create proxy info for playwright and httpx"""
- # phone: 13012345671 ip_proxy: 111.122.xx.xx1:8888
- phone, ip_proxy = self.account_pool.get_account()
- if not config.XHS_ENABLE_IP_PROXY:
- return phone, None, None
- logger.info("Begin proxy info for playwright and httpx ...")
- playwright_proxy = {
- "server": f"{config.IP_PROXY_PROTOCOL}{ip_proxy}",
- "username": config.IP_PROXY_USER,
- "password": config.IP_PROXY_PASSWORD,
- }
- httpx_proxy = f"{config.IP_PROXY_PROTOCOL}{config.IP_PROXY_USER}:{config.IP_PROXY_PASSWORD}@{ip_proxy}"
- return phone, playwright_proxy, httpx_proxy
-
- async def create_xhs_client(self, httpx_proxy: Optional[str]) -> XHSClient:
- """Create xhs client"""
- logger.info("Begin create xiaohongshu API client ...")
- cookie_str, cookie_dict = utils.convert_cookies(
- await self.browser_context.cookies()
- )
- xhs_client_obj = XHSClient(
- proxies=httpx_proxy,
- headers={
- "User-Agent": self.user_agent,
- "Cookie": cookie_str,
- "Origin": "https://www.xiaohongshu.com",
- "Referer": "https://www.xiaohongshu.com",
- "Content-Type": "application/json;charset=UTF-8",
- },
- playwright_page=self.context_page,
- cookie_dict=cookie_dict,
- )
- return xhs_client_obj
-
- async def launch_browser(
- self,
- chromium: BrowserType,
- playwright_proxy: Optional[Dict],
- user_agent: Optional[str],
- headless: bool = True,
- ) -> BrowserContext:
- """Launch browser and create browser context"""
- logger.info("Begin create browser context ...")
- if config.XHS_SAVE_LOGIN_STATE:
- # feat issue #14
- # we will save login state to avoid login every time
- user_data_dir = config.TEMP_DIR
- # user_data_dir = os.path.join(
- # os.getcwd(), "browser_data", self.platform
- # ) # type: ignore
- browser_context = await chromium.launch_persistent_context(
- user_data_dir=user_data_dir,
- accept_downloads=True,
- headless=headless,
- proxy=playwright_proxy, # type: ignore
- viewport={"width": 1920, "height": 1080},
- user_agent=user_agent,
- )
- return browser_context
- else:
- browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
- browser_context = await browser.new_context(
- viewport={"width": 1920, "height": 1080}, user_agent=user_agent
- )
- return browser_context
-
- async def close(self):
- """Close browser context"""
- await self.browser_context.close()
- logger.info("Browser context closed ...")
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/exception.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/exception.py
deleted file mode 100644
index 1a8642e..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/exception.py
+++ /dev/null
@@ -1,9 +0,0 @@
-from httpx import RequestError
-
-
-class DataFetchError(RequestError):
- """something error when fetch"""
-
-
-class IPBlockError(RequestError):
- """fetch so fast that the server block us ip"""
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/field.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/field.py
deleted file mode 100644
index fbac0d2..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/field.py
+++ /dev/null
@@ -1,72 +0,0 @@
-from enum import Enum
-from typing import NamedTuple
-
-
-class FeedType(Enum):
- # 推荐
- RECOMMEND = "homefeed_recommend"
- # 穿搭
- FASION = "homefeed.fashion_v3"
- # 美食
- FOOD = "homefeed.food_v3"
- # 彩妆
- COSMETICS = "homefeed.cosmetics_v3"
- # 影视
- MOVIE = "homefeed.movie_and_tv_v3"
- # 职场
- CAREER = "homefeed.career_v3"
- # 情感
- EMOTION = "homefeed.love_v3"
- # 家居
- HOURSE = "homefeed.household_product_v3"
- # 游戏
- GAME = "homefeed.gaming_v3"
- # 旅行
- TRAVEL = "homefeed.travel_v3"
- # 健身
- FITNESS = "homefeed.fitness_v3"
-
-
-class NoteType(Enum):
- NORMAL = "normal"
- VIDEO = "video"
-
-
-class SearchSortType(Enum):
- """search sort type"""
- # default
- GENERAL = "general"
- # most popular
- MOST_POPULAR = "popularity_descending"
- # Latest
- LATEST = "time_descending"
-
-
-class SearchNoteType(Enum):
- """search note type
- """
- # default
- ALL = 0
- # only video
- VIDEO = 1
- # only image
- IMAGE = 2
-
-
-class Note(NamedTuple):
- """note tuple"""
- note_id: str
- title: str
- desc: str
- type: str
- user: dict
- img_urls: list
- video_url: str
- tag_list: list
- at_user_list: list
- collected_count: str
- comment_count: str
- liked_count: str
- share_count: str
- time: int
- last_update_time: int
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/help.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/help.py
deleted file mode 100644
index c1e191f..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/help.py
+++ /dev/null
@@ -1,262 +0,0 @@
-import ctypes
-import json
-import random
-import time
-import urllib.parse
-
-
-def sign(a1="", b1="", x_s="", x_t=""):
- """
- takes in a URI (uniform resource identifier), an optional data dictionary, and an optional ctime parameter. It returns a dictionary containing two keys: "x-s" and "x-t".
- """
- common = {
- "s0": 5, # getPlatformCode
- "s1": "",
- "x0": "1", # localStorage.getItem("b1b1")
- "x1": "3.3.0", # version
- "x2": "Windows",
- "x3": "xhs-pc-web",
- "x4": "1.4.4",
- "x5": a1, # cookie of a1
- "x6": x_t,
- "x7": x_s,
- "x8": b1, # localStorage.getItem("b1")
- "x9": mrc(x_t + x_s + b1),
- "x10": 1, # getSigCount
- }
- encode_str = encodeUtf8(json.dumps(common, separators=(',', ':')))
- x_s_common = b64Encode(encode_str)
- x_b3_traceid = get_b3_trace_id()
- return {
- "x-s": x_s,
- "x-t": x_t,
- "x-s-common": x_s_common,
- "x-b3-traceid": x_b3_traceid
- }
-
-
-def get_b3_trace_id():
- re = "abcdef0123456789"
- je = 16
- e = ""
- for t in range(16):
- e += re[random.randint(0, je - 1)]
- return e
-
-
-def mrc(e):
- ie = [
- 0, 1996959894, 3993919788, 2567524794, 124634137, 1886057615, 3915621685,
- 2657392035, 249268274, 2044508324, 3772115230, 2547177864, 162941995,
- 2125561021, 3887607047, 2428444049, 498536548, 1789927666, 4089016648,
- 2227061214, 450548861, 1843258603, 4107580753, 2211677639, 325883990,
- 1684777152, 4251122042, 2321926636, 335633487, 1661365465, 4195302755,
- 2366115317, 997073096, 1281953886, 3579855332, 2724688242, 1006888145,
- 1258607687, 3524101629, 2768942443, 901097722, 1119000684, 3686517206,
- 2898065728, 853044451, 1172266101, 3705015759, 2882616665, 651767980,
- 1373503546, 3369554304, 3218104598, 565507253, 1454621731, 3485111705,
- 3099436303, 671266974, 1594198024, 3322730930, 2970347812, 795835527,
- 1483230225, 3244367275, 3060149565, 1994146192, 31158534, 2563907772,
- 4023717930, 1907459465, 112637215, 2680153253, 3904427059, 2013776290,
- 251722036, 2517215374, 3775830040, 2137656763, 141376813, 2439277719,
- 3865271297, 1802195444, 476864866, 2238001368, 4066508878, 1812370925,
- 453092731, 2181625025, 4111451223, 1706088902, 314042704, 2344532202,
- 4240017532, 1658658271, 366619977, 2362670323, 4224994405, 1303535960,
- 984961486, 2747007092, 3569037538, 1256170817, 1037604311, 2765210733,
- 3554079995, 1131014506, 879679996, 2909243462, 3663771856, 1141124467,
- 855842277, 2852801631, 3708648649, 1342533948, 654459306, 3188396048,
- 3373015174, 1466479909, 544179635, 3110523913, 3462522015, 1591671054,
- 702138776, 2966460450, 3352799412, 1504918807, 783551873, 3082640443,
- 3233442989, 3988292384, 2596254646, 62317068, 1957810842, 3939845945,
- 2647816111, 81470997, 1943803523, 3814918930, 2489596804, 225274430,
- 2053790376, 3826175755, 2466906013, 167816743, 2097651377, 4027552580,
- 2265490386, 503444072, 1762050814, 4150417245, 2154129355, 426522225,
- 1852507879, 4275313526, 2312317920, 282753626, 1742555852, 4189708143,
- 2394877945, 397917763, 1622183637, 3604390888, 2714866558, 953729732,
- 1340076626, 3518719985, 2797360999, 1068828381, 1219638859, 3624741850,
- 2936675148, 906185462, 1090812512, 3747672003, 2825379669, 829329135,
- 1181335161, 3412177804, 3160834842, 628085408, 1382605366, 3423369109,
- 3138078467, 570562233, 1426400815, 3317316542, 2998733608, 733239954,
- 1555261956, 3268935591, 3050360625, 752459403, 1541320221, 2607071920,
- 3965973030, 1969922972, 40735498, 2617837225, 3943577151, 1913087877,
- 83908371, 2512341634, 3803740692, 2075208622, 213261112, 2463272603,
- 3855990285, 2094854071, 198958881, 2262029012, 4057260610, 1759359992,
- 534414190, 2176718541, 4139329115, 1873836001, 414664567, 2282248934,
- 4279200368, 1711684554, 285281116, 2405801727, 4167216745, 1634467795,
- 376229701, 2685067896, 3608007406, 1308918612, 956543938, 2808555105,
- 3495958263, 1231636301, 1047427035, 2932959818, 3654703836, 1088359270,
- 936918000, 2847714899, 3736837829, 1202900863, 817233897, 3183342108,
- 3401237130, 1404277552, 615818150, 3134207493, 3453421203, 1423857449,
- 601450431, 3009837614, 3294710456, 1567103746, 711928724, 3020668471,
- 3272380065, 1510334235, 755167117,
- ]
- o = -1
-
- def right_without_sign(num: int, bit: int=0) -> int:
- val = ctypes.c_uint32(num).value >> bit
- MAX32INT = 4294967295
- return (val + (MAX32INT + 1)) % (2 * (MAX32INT + 1)) - MAX32INT - 1
-
- for n in range(57):
- o = ie[(o & 255) ^ ord(e[n])] ^ right_without_sign(o, 8)
- return o ^ -1 ^ 3988292384
-
-
-lookup = [
- "Z",
- "m",
- "s",
- "e",
- "r",
- "b",
- "B",
- "o",
- "H",
- "Q",
- "t",
- "N",
- "P",
- "+",
- "w",
- "O",
- "c",
- "z",
- "a",
- "/",
- "L",
- "p",
- "n",
- "g",
- "G",
- "8",
- "y",
- "J",
- "q",
- "4",
- "2",
- "K",
- "W",
- "Y",
- "j",
- "0",
- "D",
- "S",
- "f",
- "d",
- "i",
- "k",
- "x",
- "3",
- "V",
- "T",
- "1",
- "6",
- "I",
- "l",
- "U",
- "A",
- "F",
- "M",
- "9",
- "7",
- "h",
- "E",
- "C",
- "v",
- "u",
- "R",
- "X",
- "5",
-]
-
-
-def tripletToBase64(e):
- return (
- lookup[63 & (e >> 18)] +
- lookup[63 & (e >> 12)] +
- lookup[(e >> 6) & 63] +
- lookup[e & 63]
- )
-
-
-def encodeChunk(e, t, r):
- m = []
- for b in range(t, r, 3):
- n = (16711680 & (e[b] << 16)) + \
- ((e[b + 1] << 8) & 65280) + (e[b + 2] & 255)
- m.append(tripletToBase64(n))
- return ''.join(m)
-
-
-def b64Encode(e):
- P = len(e)
- W = P % 3
- U = []
- z = 16383
- H = 0
- Z = P - W
- while H < Z:
- U.append(encodeChunk(e, H, Z if H + z > Z else H + z))
- H += z
- if 1 == W:
- F = e[P - 1]
- U.append(lookup[F >> 2] + lookup[(F << 4) & 63] + "==")
- elif 2 == W:
- F = (e[P - 2] << 8) + e[P - 1]
- U.append(lookup[F >> 10] + lookup[63 & (F >> 4)] +
- lookup[(F << 2) & 63] + "=")
- return "".join(U)
-
-
-def encodeUtf8(e):
- b = []
- m = urllib.parse.quote(e, safe='~()*!.\'')
- w = 0
- while w < len(m):
- T = m[w]
- if T == "%":
- E = m[w + 1] + m[w + 2]
- S = int(E, 16)
- b.append(S)
- w += 2
- else:
- b.append(ord(T[0]))
- w += 1
- return b
-
-
-def base36encode(number, alphabet='0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'):
- """Converts an integer to a base36 string."""
- if not isinstance(number, int):
- raise TypeError('number must be an integer')
-
- base36 = ''
- sign = ''
-
- if number < 0:
- sign = '-'
- number = -number
-
- if 0 <= number < len(alphabet):
- return sign + alphabet[number]
-
- while number != 0:
- number, i = divmod(number, len(alphabet))
- base36 = alphabet[i] + base36
-
- return sign + base36
-
-
-def base36decode(number):
- return int(number, 36)
-
-
-def get_search_id():
- e = int(time.time() * 1000) << 64
- t = int(random.uniform(0, 2147483646))
- return base36encode((e + t))
-
-
-if __name__ == '__main__':
- a = get_b3_trace_id()
- print(a)
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/login.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/login.py
deleted file mode 100644
index fb671b0..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/login.py
+++ /dev/null
@@ -1,132 +0,0 @@
-import asyncio
-import functools
-import sys
-from typing import Optional
-
-# import redis
-from playwright.async_api import BrowserContext, Page
-from tenacity import RetryError, retry, retry_if_result, stop_after_attempt, wait_fixed
-
-from fastfetchbot_shared.utils.logger import logger
-
-# import config
-from .base_crawler import AbstractLogin
-from . import utils
-
-
-class XHSLogin(AbstractLogin):
- def __init__(
- self,
- login_type: str,
- browser_context: BrowserContext,
- context_page: Page,
- login_phone: Optional[str] = "",
- cookie_str: dict = {},
- ):
- self.login_type = login_type
- self.browser_context = browser_context
- self.context_page = context_page
- self.login_phone = login_phone
- self.cookie_str = cookie_str
-
- @retry(
- stop=stop_after_attempt(20),
- wait=wait_fixed(1),
- retry=retry_if_result(lambda value: value is False),
- )
- async def check_login_state(self, no_logged_in_session: str) -> bool:
- """
- Check if the current login status is successful and return True otherwise return False
- retry decorator will retry 20 times if the return value is False, and the retry interval is 1 second
- if max retry times reached, raise RetryError
- """
- current_cookie = await self.browser_context.cookies()
- _, cookie_dict = utils.convert_cookies(current_cookie)
- current_web_session = cookie_dict.get("web_session")
- if current_web_session != no_logged_in_session:
- return True
- return False
-
- async def begin(self):
- """Start login xiaohongshu"""
- logger.info("Begin login xiaohongshu ...")
- if self.login_type == "qrcode":
- await self.login_by_qrcode()
- elif self.login_type == "phone":
- await self.login_by_mobile()
- elif self.login_type == "cookie":
- await self.login_by_cookies()
- else:
- raise ValueError(
- "Invalid Login Type Currently only supported qrcode or phone or cookies ..."
- )
-
- async def login_by_mobile(self):
- pass
-
- async def login_by_qrcode(self):
- """login xiaohongshu website and keep webdriver login state"""
- logger.info("Begin login xiaohongshu by qrcode ...")
- # login_selector = "div.login-container > div.left > div.qrcode > img"
- qrcode_img_selector = "xpath=//img[@class='qrcode-img']"
- # find login qrcode
- base64_qrcode_img = await utils.find_login_qrcode(
- self.context_page, selector=qrcode_img_selector
- )
- if not base64_qrcode_img:
- logger.info("login failed , have not found qrcode please check ....")
- # if this website does not automatically popup login dialog box, we will manual click login button
- await asyncio.sleep(0.5)
- login_button_ele = self.context_page.locator(
- "xpath=//*[@id='app']/div[1]/div[2]/div[1]/ul/div[1]/button"
- )
- await login_button_ele.click()
- base64_qrcode_img = await utils.find_login_qrcode(
- self.context_page, selector=qrcode_img_selector
- )
- if not base64_qrcode_img:
- sys.exit()
-
- # get not logged session
- current_cookie = await self.browser_context.cookies()
- _, cookie_dict = utils.convert_cookies(current_cookie)
- no_logged_in_session = cookie_dict.get("web_session")
-
- # show login qrcode
- # fix issue #12
- # we need to use partial function to call show_qrcode function and run in executor
- # then current asyncio event loop will not be blocked
- partial_show_qrcode = functools.partial(utils.show_qrcode, base64_qrcode_img)
- asyncio.get_running_loop().run_in_executor(
- executor=None, func=partial_show_qrcode
- )
-
- logger.info(f"waiting for scan code login, remaining time is 20s")
- try:
- await self.check_login_state(no_logged_in_session)
- except RetryError:
- logger.info("Login xiaohongshu failed by qrcode login method ...")
- sys.exit()
-
- wait_redirect_seconds = 5
- logger.info(
- f"Login successful then wait for {wait_redirect_seconds} seconds redirect ..."
- )
- await asyncio.sleep(wait_redirect_seconds)
-
- async def login_by_cookies(self):
- """login xiaohongshu website by cookies"""
- logger.info("Begin login xiaohongshu by cookie ...")
- for key, value in self.cookie_str.items():
- if key != "web_session": # only set web_session cookie attr
- continue
- await self.browser_context.add_cookies(
- [
- {
- "name": key,
- "value": value,
- "domain": ".xiaohongshu.com",
- "path": "/",
- }
- ]
- )
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/proxy_account_pool.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/proxy_account_pool.py
deleted file mode 100644
index 87c9202..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/proxy_account_pool.py
+++ /dev/null
@@ -1,132 +0,0 @@
-from typing import List, Optional, Set, Tuple
-
-from src import config
-
-
-class PhonePool:
- """phone pool class"""
-
- def __init__(self) -> None:
- self.phones: List[str] = []
- self.used_phones: Set[str] = set()
-
- def add_phone(self, phone: str) -> bool:
- """add phone to the pool"""
- if phone not in self.phones:
- self.phones.append(phone)
- return True
- return False
-
- def remove_phone(self, phone: str) -> bool:
- """remove phone from the pool"""
- if phone in self.used_phones:
- self.phones.remove(phone)
- self.used_phones.remove(phone)
- return True
- return False
-
- def get_phone(self) -> Optional[str]:
- """get phone and mark as used"""
- if self.phones:
- left_phone = self.phones.pop(0)
- self.used_phones.add(left_phone)
- return left_phone
- return None
-
- def clear(self):
- """clear phone pool"""
- self.phones = []
- self.used_phones = set()
-
-
-class IPPool:
- def __init__(self) -> None:
- self.ips: List[str] = []
- self.used_ips: Set[str] = set()
-
- def add_ip(self, ip):
- """添加ip"""
- if ip not in self.ips:
- self.ips.append(ip)
- return True
- return False
-
- def remove_ip(self, ip: str) -> bool:
- """remove ip"""
- if ip in self.used_ips:
- self.ips.remove(ip)
- self.used_ips.remove(ip)
- return True
- return False
-
- def get_ip(self) -> Optional[str]:
- """get ip and mark as used"""
- if self.ips:
- left_ips = self.ips.pop(0)
- self.used_ips.add(left_ips)
- return left_ips
- return None
-
- def clear(self):
- """clear ip pool"""
- self.ips = []
- self.used_ips = set()
-
-
-class AccountPool:
- """account pool class"""
-
- def __init__(self):
- self.phone_pool = PhonePool()
- self.ip_pool = IPPool()
-
- def add_account(self, phone: str, ip: str) -> bool:
- """add account to pool with phone and ip"""
- if self.phone_pool.add_phone(phone) and self.ip_pool.add_ip(ip):
- return True
- return False
-
- def remove_account(self, phone: str, ip: str) -> bool:
- """remove account from pool"""
- if self.phone_pool.remove_phone(phone) and self.ip_pool.remove_ip(ip):
- return True
- return False
-
- def get_account(self) -> Tuple[str, str]:
- """get account if no account, reload account pool"""
- phone = self.phone_pool.get_phone()
- ip = self.ip_pool.get_ip()
- # if not phone or not ip:
- # reload_account_pool(self)
- # return self.get_account()
- return phone, ip
-
- def clear_account(self):
- """clear account pool"""
- self.phone_pool.clear()
- self.ip_pool.clear()
-
-
-def reload_account_pool(apo: AccountPool):
- """reload account pool"""
- apo.clear_account()
- for phone, ip in zip(config.XHS_PHONE_LIST, config.XHS_IP_PROXY_LIST):
- apo.add_account(phone, ip)
-
-
-def create_account_pool() -> AccountPool:
- """create account pool"""
- apo = AccountPool()
- reload_account_pool(apo=apo)
- return apo
-
-
-if __name__ == "__main__":
- import time
-
- ac_pool = create_account_pool()
- p, i = ac_pool.get_account()
- while p:
- print(f"get phone:{p}, ip proxy:{i} from account pool")
- p, i = ac_pool.get_account()
- time.sleep(1)
diff --git a/apps/api/src/services/scrapers/xiaohongshu/xhs/utils.py b/apps/api/src/services/scrapers/xiaohongshu/xhs/utils.py
deleted file mode 100644
index 43c2f44..0000000
--- a/apps/api/src/services/scrapers/xiaohongshu/xhs/utils.py
+++ /dev/null
@@ -1,146 +0,0 @@
-import base64
-import logging
-import os
-import random
-import re
-import time
-from io import BytesIO
-from typing import Dict, List, Optional, Tuple
-from urllib.parse import urlparse
-
-import httpx
-
-from PIL import Image, ImageDraw
-from playwright.async_api import Cookie, Page
-
-
-async def find_login_qrcode(page: Page, selector: str) -> str:
- """find login qrcode image from target selector"""
- try:
- elements = await page.wait_for_selector(
- selector=selector,
- )
- login_qrcode_img = await elements.get_property("src") # type: ignore
- return str(login_qrcode_img)
-
- except Exception as e:
- print(e)
- return ""
-
-
-def show_qrcode(qr_code) -> None: # type: ignore
- """parse base64 encode qrcode image and show it"""
- qr_code = qr_code.split(",")[1]
- qr_code = base64.b64decode(qr_code)
- image = Image.open(BytesIO(qr_code))
-
- # Add a square border around the QR code and display it within the border to improve scanning accuracy.
- width, height = image.size
- new_image = Image.new("RGB", (width + 20, height + 20), color=(255, 255, 255))
- new_image.paste(image, (10, 10))
- draw = ImageDraw.Draw(new_image)
- draw.rectangle((0, 0, width + 19, height + 19), outline=(0, 0, 0), width=1)
- new_image.show()
-
-
-def get_user_agent() -> str:
- ua_list = [
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.79 Safari/537.36",
- "Mozilla/5.0 (Windows NT 10.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36",
- "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.53 Safari/537.36",
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36",
- ]
- return random.choice(ua_list)
-
-
-def convert_cookies(cookies: Optional[List[Cookie]]) -> Tuple[str, Dict]:
- if not cookies:
- return "", {}
- cookies_str = ";".join(
- [f"{cookie.get('name')}={cookie.get('value')}" for cookie in cookies]
- )
- cookie_dict = dict()
- for cookie in cookies:
- cookie_dict[cookie.get("name")] = cookie.get("value")
- return cookies_str, cookie_dict
-
-
-def convert_str_cookie_to_dict(cookie_str: str) -> Dict:
- cookie_dict: Dict[str, str] = dict()
- if not cookie_str:
- return cookie_dict
- for cookie in cookie_str.split(";"):
- cookie = cookie.strip()
- if not cookie:
- continue
- cookie_list = cookie.split("=")
- if len(cookie_list) != 2:
- continue
- cookie_value = cookie_list[1]
- if isinstance(cookie_value, list):
- cookie_value = "".join(cookie_value)
- cookie_dict[cookie_list[0]] = cookie_value
- return cookie_dict
-
-
-def get_current_timestamp():
- return int(time.time() * 1000)
-
-
-def match_interact_info_count(count_str: str) -> int:
- if not count_str:
- return 0
-
- match = re.search(r"\d+", count_str)
- if match:
- number = match.group()
- return int(number)
- else:
- return 0
-
-
-def init_loging_config():
- level = logging.INFO
- logging.basicConfig(
- level=level,
- format="%(asctime)s %(name)s %(levelname)s %(message)s ",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- _logger = logging.getLogger("MediaCrawler")
- _logger.setLevel(level)
- return _logger
-
-
-logger = init_loging_config()
-
-
-def get_track_simple(distance) -> List[int]:
- track: List[int] = []
- current = 0
- mid = distance * 4 / 5
- t = 0.2
- v = 1
-
- while current < distance:
- if current < mid:
- a = 4
- else:
- a = -3
- v0 = v
- v = v0 + a * t # type: ignore
- move = v0 * t + 1 / 2 * a * t * t
- current += move # type: ignore
- track.append(round(move))
- return track
-
-
-def get_tracks(distance: int, level: str = "easy") -> List[int]:
- if level == "easy":
- return get_track_simple(distance)
- else:
- from . import easing
-
- _, tricks = easing.get_tracks(distance, seconds=2, ease_func="ease_out_expo")
- return tricks
diff --git a/apps/api/src/templates/xiaohongshu_content.jinja2 b/apps/api/src/templates/xiaohongshu_content.jinja2
index d26fbed..2bda357 100644
--- a/apps/api/src/templates/xiaohongshu_content.jinja2
+++ b/apps/api/src/templates/xiaohongshu_content.jinja2
@@ -1,7 +1,7 @@
{{ data.title }}
-作者:{{ data.author }}
+作者:{{ data.author }}
发布于 {{ data.created }} 最近更新于 {{ data.updated }}
-收藏:{{ data.collected_count }} 转发:{{ data.share_count }} 评论:{{ data.comments_count }} 点赞:{{ data.like_count }}
+
收藏:{{ data.collected_count }} 转发:{{ data.share_count }} 评论:{{ data.comment_count }} 点赞:{{ data.like_count }}
{% if data.ip_location %}
来自{{ data.ip_location }}
{% endif %}
diff --git a/packages/shared/fastfetchbot_shared/config.py b/packages/shared/fastfetchbot_shared/config.py
index ec80059..fca6987 100644
--- a/packages/shared/fastfetchbot_shared/config.py
+++ b/packages/shared/fastfetchbot_shared/config.py
@@ -17,3 +17,7 @@
# Utils environment variables
HTTP_REQUEST_TIMEOUT = env.get("HTTP_REQUEST_TIMEOUT", 30)
+
+# XHS (Xiaohongshu) shared configuration
+SIGN_SERVER_URL = env.get("SIGN_SERVER_URL", "http://localhost:8989")
+XHS_COOKIE_PATH = env.get("XHS_COOKIE_PATH", "")
diff --git a/packages/shared/fastfetchbot_shared/utils/config.py b/packages/shared/fastfetchbot_shared/utils/config.py
index bad822d..a33b825 100644
--- a/packages/shared/fastfetchbot_shared/utils/config.py
+++ b/packages/shared/fastfetchbot_shared/utils/config.py
@@ -51,5 +51,6 @@
BANNED_PATTERNS = [
r"chatgpt\.com\/share\/[A-Za-z0-9]+",
r"gemini\/share\/[A-Za-z0-9]+",
- r"t\.me\/[A-Za-z0-9]+"
+ r"t\.me\/[A-Za-z0-9]+",
+ r"github\.com\/[A-Za-z0-9_-]+\/[A-Za-z0-9_-]+",
]
diff --git a/packages/shared/fastfetchbot_shared/utils/parse.py b/packages/shared/fastfetchbot_shared/utils/parse.py
index 75076b7..61527af 100644
--- a/packages/shared/fastfetchbot_shared/utils/parse.py
+++ b/packages/shared/fastfetchbot_shared/utils/parse.py
@@ -84,7 +84,7 @@ async def get_url_metadata(url: str, ban_list: Optional[list] = None) -> UrlMeta
source = website
content_type = "video"
# clear the url query
- if source not in ["youtube", "bilibili", "wechat"]:
+ if source not in ["youtube", "bilibili", "wechat", "xiaohongshu"]:
url = url_parser.scheme + "://" + url_parser.netloc + url_parser.path
if source in ban_list:
source = "banned"
diff --git a/template.env b/template.env
index 429a75a..b5c6038 100644
--- a/template.env
+++ b/template.env
@@ -78,6 +78,14 @@ XIAOHONGSHU_WEBID=
# The websession cookie of xiaohongshu. Default: `None`
XIAOHONGSHU_WEBSESSION=
+# XHS Sign Server URL for the signing proxy. Default: `http://localhost:8989`
+SIGN_SERVER_URL=
+
+# Path to a file containing XHS cookies as a single-line string.
+# Default: conf/xhs_cookies.txt (relative to apps/api). Override with an absolute path if needed.
+# Takes priority over XIAOHONGSHU_A1/WEBID/WEBSESSION when the file exists.
+XHS_COOKIE_PATH=
+
# OpenAI
# The api key of OpenAI. Default: `None`
OPENAI_API_KEY=