s
- term
- def
diff --git a/.codecov.yml b/.codecov.yml
new file mode 100644
index 0000000..bdccd27
--- /dev/null
+++ b/.codecov.yml
@@ -0,0 +1,24 @@
+coverage:
+ status:
+ # 1. Project Coverage: Enforces rules on the entire codebase
+ project:
+ default:
+ target: auto # 'auto' means the PR cannot decrease the overall coverage
+ threshold: 1% # Allows a 1% drop margin for acceptable fluctuations
+
+ # 2. Patch Coverage: Enforces rules ONLY on the lines of code modified in the PR
+ patch:
+ default:
+ target: 80% # The specific requirement you asked for: 80% on new code
+
+# 3. Ignore paths: Exclude test files and configs from coverage calculations
+ignore:
+ - "tests/**/*"
+ - ".github/**/*"
+ - "**/__init__.py"
+
+# Optional: Configure the Codecov PR comment bot
+comment:
+ layout: "reach, diff, flags, files"
+ behavior: default
+ require_changes: false # Always post a comment, even if coverage didn't change
\ No newline at end of file
diff --git a/.github/workflows/pr-gate.yml b/.github/workflows/pr-gate.yml
new file mode 100644
index 0000000..178fe10
--- /dev/null
+++ b/.github/workflows/pr-gate.yml
@@ -0,0 +1,43 @@
+name: pr-gate
+
+on:
+ pull_request:
+ branches:
+ - main
+
+concurrency:
+ group: pr-gate-${{ github.head_ref }}
+ cancel-in-progress: true
+
+jobs:
+ test:
+ runs-on: ubuntu-latest
+ permissions:
+ contents: read
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: 0
+
+ - name: Install uv
+ uses: astral-sh/setup-uv@v6
+ with:
+ version: "0.10.4"
+
+ - name: Set up Python
+ run: uv python install 3.12
+
+ - name: Install dependencies
+ run: uv sync
+
+ - name: Run tests with coverage
+ run: uv run pytest --cov --cov-report=xml
+
+ - name: Upload coverage to Codecov
+ uses: codecov/codecov-action@v5
+ with:
+ files: coverage.xml
+ token: ${{ secrets.CODECOV_TOKEN }}
+ fail_ci_if_error: true
diff --git a/apps/telegram-bot/tests/conftest.py b/apps/telegram-bot/tests/conftest.py
deleted file mode 100644
index d006314..0000000
--- a/apps/telegram-bot/tests/conftest.py
+++ /dev/null
@@ -1,67 +0,0 @@
-"""
-Test fixtures for the telegram-bot app.
-
-All bot service calls are mocked — tests verify routing, auth, and
-request handling without touching the real Telegram API.
-"""
-
-import os
-
-import pytest
-import pytest_asyncio
-from contextlib import asynccontextmanager
-from unittest.mock import patch
-
-from httpx import AsyncClient, ASGITransport
-
-TEST_TELEGRAM_SECRET = "test-telegram-secret-token"
-
-os.environ["TELEGRAM_BOT_SECRET_TOKEN"] = TEST_TELEGRAM_SECRET
-os.environ["TELEGRAM_BOT_TOKEN"] = "000000000:AAFakeTokenForTesting"
-os.environ["DATABASE_ON"] = "false"
-os.environ["BASE_URL"] = "localhost"
-
-
-@pytest.fixture(scope="session")
-def anyio_backend():
- return "asyncio"
-
-
-@pytest_asyncio.fixture(scope="module")
-async def app():
- """
- Create a Starlette webhook_app with a no-op lifespan
- so we can test routes without real bot initialization.
- """
- @asynccontextmanager
- async def mock_lifespan(app):
- yield
-
- with patch("core.webhook.server.lifespan", mock_lifespan):
- from starlette.applications import Starlette
- from starlette.routing import Route
- from core.webhook.server import telegram_webhook, send_message_endpoint, health
-
- test_app = Starlette(
- routes=[
- Route("/webhook", telegram_webhook, methods=["POST"]),
- Route("/send_message", send_message_endpoint, methods=["POST"]),
- Route("/health", health, methods=["GET"]),
- ],
- lifespan=mock_lifespan,
- )
- yield test_app
-
-
-@pytest_asyncio.fixture(scope="module")
-async def client(app):
- """Async HTTP client hitting the Starlette app via ASGI transport."""
- transport = ASGITransport(app=app)
- async with AsyncClient(transport=transport, base_url="http://test") as ac:
- yield ac
-
-
-@pytest.fixture
-def telegram_auth_headers():
- """Headers dict with valid Telegram secret token."""
- return {"X-Telegram-Bot-Api-Secret-Token": TEST_TELEGRAM_SECRET}
diff --git a/apps/telegram-bot/tests/test_webhook.py b/apps/telegram-bot/tests/test_webhook.py
deleted file mode 100644
index e1ddd93..0000000
--- a/apps/telegram-bot/tests/test_webhook.py
+++ /dev/null
@@ -1,101 +0,0 @@
-"""
-Tests for the telegram-bot webhook endpoint.
-
-Verifies:
- - Auth: valid/missing/wrong secret token handling
- - Processing: updates are dispatched via asyncio.create_task (fire-and-forget)
- - Response: 200 with {"status": "ok"} for valid requests
-"""
-
-import asyncio
-
-import pytest
-from unittest.mock import AsyncMock, patch
-
-from tests.conftest import TEST_TELEGRAM_SECRET
-
-
-SAMPLE_UPDATE = {
- "update_id": 123456,
- "message": {
- "message_id": 1,
- "text": "/start",
- "chat": {"id": 789, "type": "private"},
- },
-}
-
-
-class TestTelegramWebhook:
- """Tests for POST /webhook"""
-
- @pytest.mark.asyncio
- async def test_webhook_accepts_valid_update(self, client, telegram_auth_headers):
- """Valid secret token + JSON body -> 200, process_telegram_update is called."""
- with patch(
- "core.webhook.server.process_telegram_update",
- new_callable=AsyncMock,
- ) as mock_process:
- resp = await client.post(
- "/webhook",
- json=SAMPLE_UPDATE,
- headers=telegram_auth_headers,
- )
-
- assert resp.status_code == 200
- assert resp.json() == {"status": "ok"}
-
- # Allow the create_task coroutine to run
- await asyncio.sleep(0)
- mock_process.assert_called_once_with(SAMPLE_UPDATE)
-
- @pytest.mark.asyncio
- async def test_webhook_rejects_missing_token(self, client):
- """No secret token header -> 401."""
- resp = await client.post("/webhook", json=SAMPLE_UPDATE)
- assert resp.status_code == 401
- assert resp.json() == {"error": "unauthorized"}
-
- @pytest.mark.asyncio
- async def test_webhook_rejects_wrong_token(self, client):
- """Wrong secret token -> 401."""
- resp = await client.post(
- "/webhook",
- json=SAMPLE_UPDATE,
- headers={"X-Telegram-Bot-Api-Secret-Token": "wrong-token"},
- )
- assert resp.status_code == 401
- assert resp.json() == {"error": "unauthorized"}
-
- @pytest.mark.asyncio
- async def test_webhook_responds_before_processing_completes(
- self, client, telegram_auth_headers
- ):
- """
- The webhook must return 200 immediately, before the update
- processing finishes. This is the fire-and-forget behavior that
- prevents Telegram from timing out on slow handlers.
- """
- processing_started = asyncio.Event()
- processing_gate = asyncio.Event()
-
- async def slow_process(data):
- processing_started.set()
- await processing_gate.wait() # Block until test releases
-
- with patch(
- "core.webhook.server.process_telegram_update",
- side_effect=slow_process,
- ):
- resp = await client.post(
- "/webhook",
- json=SAMPLE_UPDATE,
- headers=telegram_auth_headers,
- )
-
- # Response arrived while processing is still blocked
- assert resp.status_code == 200
- assert resp.json() == {"status": "ok"}
-
- # Let the background task finish to avoid warnings
- processing_gate.set()
- await asyncio.sleep(0)
diff --git a/packages/shared/tests/test_user_setting.py b/packages/shared/tests/test_user_setting.py
deleted file mode 100644
index 4acebcd..0000000
--- a/packages/shared/tests/test_user_setting.py
+++ /dev/null
@@ -1,115 +0,0 @@
-import pytest
-import pytest_asyncio
-from sqlalchemy import select
-from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
-
-from fastfetchbot_shared.database.base import Base
-from fastfetchbot_shared.database.models.user_setting import UserSetting
-
-
-@pytest_asyncio.fixture
-async def db_session():
- """In-memory SQLite session for testing."""
- engine = create_async_engine("sqlite+aiosqlite://", echo=False)
- async with engine.begin() as conn:
- await conn.run_sync(Base.metadata.create_all)
- session_factory = async_sessionmaker(engine, expire_on_commit=False)
- async with session_factory() as session:
- yield session
- await engine.dispose()
-
-
-@pytest.mark.asyncio
-async def test_create_user_setting(db_session):
- setting = UserSetting(telegram_user_id=123456789, auto_fetch_in_dm=True)
- db_session.add(setting)
- await db_session.commit()
-
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == 123456789)
- )
- fetched = result.scalar_one()
- assert fetched.auto_fetch_in_dm is True
- assert fetched.created_at is not None
- assert fetched.updated_at is not None
-
-
-@pytest.mark.asyncio
-async def test_toggle_user_setting(db_session):
- setting = UserSetting(telegram_user_id=123456789, auto_fetch_in_dm=True)
- db_session.add(setting)
- await db_session.commit()
-
- setting.auto_fetch_in_dm = False
- await db_session.commit()
-
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == 123456789)
- )
- fetched = result.scalar_one()
- assert fetched.auto_fetch_in_dm is False
-
-
-@pytest.mark.asyncio
-async def test_default_auto_fetch_is_true(db_session):
- setting = UserSetting(telegram_user_id=999999)
- db_session.add(setting)
- await db_session.commit()
-
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == 999999)
- )
- fetched = result.scalar_one()
- assert fetched.auto_fetch_in_dm is True
-
-
-@pytest.mark.asyncio
-async def test_no_record_returns_none(db_session):
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == 888888)
- )
- assert result.scalar_one_or_none() is None
-
-
-@pytest.mark.asyncio
-async def test_ensure_user_settings_creates_row(db_session):
- """ensure pattern: first call creates row with defaults, second is a no-op."""
- user_id = 777777
-
- # No row yet
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == user_id)
- )
- assert result.scalar_one_or_none() is None
-
- # Simulate ensure: create if missing
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == user_id)
- )
- if result.scalar_one_or_none() is None:
- db_session.add(UserSetting(telegram_user_id=user_id))
- await db_session.commit()
-
- # Row exists with defaults
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == user_id)
- )
- setting = result.scalar_one()
- assert setting.auto_fetch_in_dm is True
- assert setting.created_at is not None
-
- # Second ensure is a no-op — row unchanged
- original_created_at = setting.created_at
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == user_id)
- )
- if result.scalar_one_or_none() is None:
- db_session.add(UserSetting(telegram_user_id=user_id))
- await db_session.commit()
-
- result = await db_session.execute(
- select(UserSetting).where(UserSetting.telegram_user_id == user_id)
- )
- setting = result.scalar_one()
- assert setting.auto_fetch_in_dm is True
- assert setting.created_at == original_created_at
diff --git a/pyproject.toml b/pyproject.toml
index 43c35c4..3100a79 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -55,6 +55,7 @@ dev = [
"pytest>=8.3.5,<9.0.0",
"pytest-asyncio>=0.26.0,<0.27.0",
"celery-types>=0.24.0",
+ "pytest-cov>=7.1.0",
]
[build-system]
@@ -74,3 +75,4 @@ fastfetchbot-file-export = { workspace = true }
[tool.pytest.ini_options]
asyncio_default_fixture_loop_scope = "module"
+testpaths = ["tests"]
diff --git a/apps/telegram-bot/tests/__init__.py b/tests/integration/__init__.py
similarity index 100%
rename from apps/telegram-bot/tests/__init__.py
rename to tests/integration/__init__.py
diff --git a/tests/routers/test_scraper.py b/tests/routers/test_scraper.py
deleted file mode 100644
index 4aebbef..0000000
--- a/tests/routers/test_scraper.py
+++ /dev/null
@@ -1,181 +0,0 @@
-"""
-Tests for /scraper router endpoints.
-
-Endpoints:
- POST /scraper/getItem — Scrape content from a URL
- POST /scraper/getUrlMetadata — Get URL metadata without scraping
-
-All downstream services (InfoExtractService, get_url_metadata) are mocked.
-We only test: routing, auth, parameter parsing, and response shape.
-"""
-
-import pytest
-from unittest.mock import AsyncMock, patch
-
-from tests.conftest import TEST_API_KEY, TEST_API_KEY_NAME
-
-# NOTE on "no API key" tests:
-# auth.py has a bug where verify_key checks `api_key_query is None` (module-level
-# variable, always not None) instead of checking `input_key is None`. When no key
-# is provided, secrets.compare_digest(None, str) raises TypeError which propagates
-# as an unhandled exception. These tests are marked xfail to document this known bug.
-# Once auth.py is fixed, remove xfail and assert 401.
-
-
-# ─── POST /scraper/getItem ───────────────────────────────────────────
-
-
-class TestGetItem:
- """Tests for POST /scraper/getItem"""
-
- @pytest.mark.asyncio
- async def test_returns_scraped_data(
- self, client, auth_params, mock_get_url_metadata, mock_info_extract_service
- ):
- """Happy path: valid API key + valid url → returns scraped result."""
- _, mock_result = mock_info_extract_service
- params = {**auth_params, "url": "https://twitter.com/user/status/123"}
-
- resp = await client.post("/scraper/getItem", params=params)
-
- assert resp.status_code == 200
- assert resp.json() == mock_result
-
- @pytest.mark.asyncio
- async def test_rejects_with_wrong_api_key(self, client):
- """Wrong API key → 401."""
- resp = await client.post(
- "/scraper/getItem",
- params={TEST_API_KEY_NAME: "wrong-key", "url": "https://example.com"},
- )
- assert resp.status_code == 401
-
- @pytest.mark.xfail(
- reason="auth.py bug: verify_key checks wrong variable for None, "
- "TypeError propagates instead of returning 401",
- raises=TypeError,
- strict=True,
- )
- @pytest.mark.asyncio
- async def test_no_api_key_returns_401(self, client):
- """No API key → should be 401. Blocked by auth.py bug."""
- resp = await client.post(
- "/scraper/getItem", params={"url": "https://example.com"}
- )
- assert resp.status_code == 401
-
- @pytest.mark.xfail(
- reason="scraper.py does dict.pop('url') without default → unhandled KeyError",
- raises=KeyError,
- strict=True,
- )
- @pytest.mark.asyncio
- async def test_missing_url_returns_error(
- self, client, auth_params, mock_get_url_metadata, mock_info_extract_service
- ):
- """No url param → should return 4xx, but KeyError propagates unhandled."""
- resp = await client.post("/scraper/getItem", params=auth_params)
- assert resp.status_code in (400, 422)
-
- @pytest.mark.asyncio
- async def test_strips_api_key_from_downstream_params(
- self, client, auth_params, mock_get_url_metadata, mock_info_extract_service
- ):
- """
- API_KEY_NAME should be stripped from query_params before passing
- to InfoExtractService. Extra params should pass through.
- """
- mock_cls, _ = mock_info_extract_service
- params = {
- **auth_params,
- "url": "https://twitter.com/user/status/123",
- "extra_option": "value",
- }
-
- resp = await client.post("/scraper/getItem", params=params)
-
- assert resp.status_code == 200
- # InfoExtractService(url_metadata, **query_params) — verify call
- call_args, call_kwargs = mock_cls.call_args
- # API key name must NOT be in kwargs
- assert TEST_API_KEY_NAME not in call_kwargs
- # extra_option MUST be in kwargs
- assert call_kwargs.get("extra_option") == "value"
-
- @pytest.mark.asyncio
- async def test_passes_ban_list_to_metadata(
- self, client, auth_params, mock_get_url_metadata, mock_info_extract_service
- ):
- """ban_list param should be forwarded to get_url_metadata."""
- mock_fn, _ = mock_get_url_metadata
- params = {
- **auth_params,
- "url": "https://twitter.com/user/status/123",
- "ban_list": "twitter,weibo",
- }
-
- resp = await client.post("/scraper/getItem", params=params)
-
- assert resp.status_code == 200
- mock_fn.assert_called_once_with(
- "https://twitter.com/user/status/123", "twitter,weibo"
- )
-
-
-# ─── POST /scraper/getUrlMetadata ────────────────────────────────────
-
-
-class TestGetUrlMetadata:
- """Tests for POST /scraper/getUrlMetadata"""
-
- @pytest.mark.asyncio
- async def test_returns_metadata_dict(
- self, client, auth_params, mock_get_url_metadata
- ):
- """Happy path: returns UrlMetadata.to_dict() result."""
- params = {**auth_params, "url": "https://twitter.com/user/status/123"}
-
- resp = await client.post("/scraper/getUrlMetadata", params=params)
-
- assert resp.status_code == 200
- data = resp.json()
- assert data["source"] == "twitter"
- assert data["content_type"] == "social_media"
- assert "url" in data
-
- @pytest.mark.asyncio
- async def test_rejects_with_wrong_api_key(self, client):
- """Wrong API key → 401."""
- resp = await client.post(
- "/scraper/getUrlMetadata",
- params={TEST_API_KEY_NAME: "wrong-key", "url": "https://example.com"},
- )
- assert resp.status_code == 401
-
- @pytest.mark.xfail(
- reason="auth.py bug: verify_key checks wrong variable for None",
- raises=TypeError,
- strict=True,
- )
- @pytest.mark.asyncio
- async def test_no_api_key_returns_401(self, client):
- """No API key → should be 401. Blocked by auth.py bug."""
- resp = await client.post(
- "/scraper/getUrlMetadata", params={"url": "https://example.com"}
- )
- assert resp.status_code == 401
-
- @pytest.mark.asyncio
- async def test_metadata_url_and_ban_list_passthrough(
- self, client, auth_params, mock_get_url_metadata
- ):
- """url and ban_list params reach get_url_metadata unchanged."""
- mock_fn, _ = mock_get_url_metadata
- test_url = "https://weibo.com/some/post/456"
- params = {**auth_params, "url": test_url, "ban_list": "reddit"}
-
- await client.post("/scraper/getUrlMetadata", params=params)
-
- mock_fn.assert_called_once()
- args = mock_fn.call_args[0]
- assert args[0] == test_url
diff --git a/tests/routers/test_telegram_bot.py b/tests/routers/test_telegram_bot.py
deleted file mode 100644
index 8673c19..0000000
--- a/tests/routers/test_telegram_bot.py
+++ /dev/null
@@ -1,134 +0,0 @@
-"""
-Tests for /telegram router endpoints.
-
-Endpoints:
- POST /telegram/bot/webhook — Receive Telegram updates
- GET /telegram/bot/set_webhook — Set the webhook URL
-
-All Telegram service calls are mocked.
-"""
-
-import pytest
-from unittest.mock import AsyncMock, patch
-
-from tests.conftest import (
- TEST_API_KEY,
- TEST_API_KEY_NAME,
- TEST_TELEGRAM_SECRET,
-)
-
-
-class TestTelegramWebhook:
- """Tests for POST /telegram/bot/webhook"""
-
- @pytest.mark.asyncio
- async def test_webhook_accepts_valid_update(
- self, client, telegram_auth_headers
- ):
- """
- Valid secret token + JSON body → 200, background task queued.
- """
- with patch(
- "app.routers.telegram_bot.process_telegram_update",
- new_callable=AsyncMock,
- ):
- update_data = {
- "update_id": 123456,
- "message": {
- "message_id": 1,
- "text": "/start",
- "chat": {"id": 789, "type": "private"},
- },
- }
-
- resp = await client.post(
- "/telegram/bot/webhook",
- json=update_data,
- headers=telegram_auth_headers,
- )
-
- assert resp.status_code == 200
- assert resp.json() == "ok"
- # Background task should have been called with the update data
- # Note: BackgroundTasks in test mode may execute synchronously
- # The key assertion is that the endpoint accepted the request
-
- @pytest.mark.asyncio
- async def test_webhook_rejects_missing_token(self, client):
- """No secret token header → 401."""
- resp = await client.post(
- "/telegram/bot/webhook",
- json={"update_id": 1},
- )
- assert resp.status_code == 401
-
- @pytest.mark.asyncio
- async def test_webhook_rejects_wrong_token(self, client):
- """Wrong secret token → 401."""
- resp = await client.post(
- "/telegram/bot/webhook",
- json={"update_id": 1},
- headers={"X-Telegram-Bot-Api-Secret-Token": "wrong-token"},
- )
- assert resp.status_code == 401
-
-
-class TestSetWebhook:
- """Tests for GET /telegram/bot/set_webhook"""
-
- @pytest.mark.asyncio
- async def test_set_webhook_success(self, client, auth_params):
- """set_webhook returns True → 200 'ok'."""
- with patch(
- "app.routers.telegram_bot.set_webhook",
- new_callable=AsyncMock,
- return_value=True,
- ):
- resp = await client.get(
- "/telegram/bot/set_webhook", params=auth_params
- )
- assert resp.status_code == 200
- assert resp.json() == "ok"
-
- @pytest.mark.asyncio
- async def test_set_webhook_failure(self, client, auth_params):
- """set_webhook returns False → 500."""
- with patch(
- "app.routers.telegram_bot.set_webhook",
- new_callable=AsyncMock,
- return_value=False,
- ):
- resp = await client.get(
- "/telegram/bot/set_webhook", params=auth_params
- )
- assert resp.status_code == 500
-
- @pytest.mark.asyncio
- async def test_set_webhook_wrong_api_key(self, client):
- """Wrong API key → 401."""
- with patch(
- "app.routers.telegram_bot.set_webhook",
- new_callable=AsyncMock,
- return_value=True,
- ):
- resp = await client.get(
- "/telegram/bot/set_webhook",
- params={TEST_API_KEY_NAME: "bad-key"},
- )
- assert resp.status_code == 401
-
- @pytest.mark.xfail(
- reason="auth.py bug: verify_key checks wrong variable for None",
- raises=TypeError,
- strict=True,
- )
- @pytest.mark.asyncio
- async def test_set_webhook_no_api_key_returns_401(self, client):
- """No API key → should be 401. Blocked by auth.py bug."""
- with patch(
- "app.routers.telegram_bot.set_webhook",
- new_callable=AsyncMock,
- return_value=True,
- ):
- resp = await client.get("/telegram/bot/set_webhook")
- assert resp.status_code == 401
diff --git a/tests/routers/test_twitter.py b/tests/routers/test_twitter.py
deleted file mode 100644
index f7d9e6e..0000000
--- a/tests/routers/test_twitter.py
+++ /dev/null
@@ -1,77 +0,0 @@
-"""
-Tests for /twitter router endpoints.
-
-Endpoints:
- POST /twitter/repost — Handle twitter repost webhook
-
-NOTE: twitter router is NOT registered in production app (main.py).
- It's included in the test app via conftest.py for testing purposes.
- This is either an oversight or intentional — flag for review.
-
-InfoExtractService is mocked — we don't make real Twitter API calls in tests.
-"""
-
-import pytest
-from unittest.mock import AsyncMock, patch
-
-from tests.conftest import TEST_API_KEY, TEST_API_KEY_NAME
-
-
-class TestTwitterRepost:
- """Tests for POST /twitter/repost"""
-
- @pytest.mark.asyncio
- async def test_repost_returns_ok(self, client, auth_params):
- """Happy path: valid url → InfoExtractService called → returns 'ok'."""
- with patch(
- "app.routers.twitter.InfoExtractService"
- ) as MockCls:
- instance = MockCls.return_value
- instance.get_item = AsyncMock(return_value={"text": "mocked"})
-
- params = {**auth_params, "url": "https://twitter.com/user/status/999"}
- resp = await client.post("/twitter/repost", params=params)
-
- assert resp.status_code == 200
- assert resp.json() == "ok"
-
- # Verify InfoExtractService was constructed with correct metadata dict
- call_args = MockCls.call_args[0][0]
- assert call_args["url"] == "https://twitter.com/user/status/999"
- assert call_args["source"] == "twitter"
- assert call_args["type"] == "social_media"
-
- # Verify get_item was actually called
- instance.get_item.assert_awaited_once()
-
- @pytest.mark.asyncio
- async def test_repost_rejects_wrong_api_key(self, client):
- """Wrong API key → 401."""
- resp = await client.post(
- "/twitter/repost",
- params={
- TEST_API_KEY_NAME: "totally-wrong-key",
- "url": "https://twitter.com/x/status/1",
- },
- )
- assert resp.status_code == 401
-
- @pytest.mark.xfail(
- reason="auth.py bug: verify_key checks wrong variable for None",
- raises=TypeError,
- strict=True,
- )
- @pytest.mark.asyncio
- async def test_repost_no_api_key_returns_401(self, client):
- """No API key → should be 401. Blocked by auth.py bug."""
- resp = await client.post(
- "/twitter/repost",
- params={"url": "https://twitter.com/x/status/1"},
- )
- assert resp.status_code == 401
-
- @pytest.mark.asyncio
- async def test_repost_missing_url(self, client, auth_params):
- """Missing url param → 422 (FastAPI validation error for required param)."""
- resp = await client.post("/twitter/repost", params=auth_params)
- assert resp.status_code == 422
diff --git a/tests/test_bluesky.py b/tests/test_bluesky.py
deleted file mode 100644
index 9639cc5..0000000
--- a/tests/test_bluesky.py
+++ /dev/null
@@ -1,81 +0,0 @@
-from typing import Tuple
-
-import pytest
-import pytest_asyncio
-
-from app.services.scrapers.bluesky.scraper import BlueskyScraper
-from app.services.scrapers.scraper_manager import ScraperManager
-from app.utils.logger import logger
-from tests.cases.bluesky import bluesky_cases
-
-
-@pytest_asyncio.fixture(scope="module", autouse=True)
-async def bluesky_scraper():
- bluesky_scraper = await ScraperManager.init_bluesky_scraper()
- return bluesky_scraper
-
-
-async def get_item_from_url(bluesky_scraper: BlueskyScraper, url: str) -> dict:
- data_processor = await bluesky_scraper.get_processor_by_url(url)
- item = await data_processor.get_item()
- return item
-
-
-async def get_test_data(bluesky_scraper: BlueskyScraper, case: str) -> Tuple[dict, dict]:
- data = await get_item_from_url(bluesky_scraper=bluesky_scraper, url=bluesky_cases[case]["url"])
- return data, bluesky_cases[case]["expected"]
-
-
-@pytest.mark.asyncio
-async def test_bluesky_init(bluesky_scraper: BlueskyScraper):
- assert bluesky_scraper is not None
- assert isinstance(bluesky_scraper, BlueskyScraper)
-
-
-@pytest.mark.asyncio
-async def test_bluesky_pure_text_post(bluesky_scraper: BlueskyScraper):
- data, expected = await get_test_data(bluesky_scraper, "pure_text")
- assert True
- # assert data == expected
-
-
-@pytest.mark.asyncio
-async def test_bluesky_text_with_media_post(bluesky_scraper: BlueskyScraper):
- data, expected = await get_test_data(bluesky_scraper, "text_with_media")
- assert True
- # assert data == expected
-
-
-@pytest.mark.asyncio
-async def test_bluesky_text_with_text_repost_post(bluesky_scraper: BlueskyScraper):
- data, expected = await get_test_data(bluesky_scraper, "text_with_text_repost")
- assert True
- # assert data == expected
-
-
-@pytest.mark.asyncio
-async def test_bluesky_single_video_post(bluesky_scraper: BlueskyScraper):
- data, expected = await get_test_data(bluesky_scraper, "single_video_2")
- assert True
- # assert data == expected
-
-
-@pytest.mark.asyncio
-async def test_bluesky_post_in_middle_of_thread(bluesky_scraper: BlueskyScraper):
- data, expected = await get_test_data(bluesky_scraper, "post_in_middle_of_thread")
- assert True
- # assert data == expected
-
-
-@pytest.mark.asyncio
-async def test_bluesky_post_as_first_of_thread(bluesky_scraper: BlueskyScraper):
- data, expected = await get_test_data(bluesky_scraper, "post_as_first_of_thread")
- assert True
- # assert data == expected
-
-
-@pytest.mark.asyncio
-async def test_bluesky_post_as_last_of_thread(bluesky_scraper: BlueskyScraper):
- data, expected = await get_test_data(bluesky_scraper, "post_as_last_of_thread")
- assert True
- # assert data == expected
diff --git a/tests/test_weibo.py b/tests/test_weibo.py
deleted file mode 100644
index ae36f13..0000000
--- a/tests/test_weibo.py
+++ /dev/null
@@ -1,32 +0,0 @@
-from typing import Tuple
-
-import pytest
-import pytest_asyncio
-
-from app.services.scrapers.weibo.scraper import WeiboScraper
-from app.services.scrapers.scraper_manager import ScraperManager
-from app.utils.logger import logger
-from tests.cases.weibo import weibo_cases
-
-
-@pytest_asyncio.fixture(scope="module", autouse=True)
-async def weibo_scraper():
- weibo_scraper = await ScraperManager.init_weibo_scraper()
- return weibo_scraper
-
-
-async def get_item_from_url(weibo_scraper: WeiboScraper, url: str) -> dict:
- data_processor = await weibo_scraper.get_processor_by_url(url)
- item = await data_processor.get_item()
- return item
-
-
-async def get_test_data(weibo_scraper: WeiboScraper, case: str) -> Tuple[dict, dict]:
- data = await get_item_from_url(weibo_scraper=weibo_scraper, url=weibo_cases[case]["url"])
- return data, weibo_cases[case]["expected"]
-
-
-@pytest.mark.asyncio
-async def test_pure_short_text(weibo_scraper: WeiboScraper):
- data, expected = await get_test_data(weibo_scraper, "pure_short_text")
- assert True
diff --git a/tests/test_zhihu_content_processing.py b/tests/test_zhihu_content_processing.py
deleted file mode 100644
index 4e773c1..0000000
--- a/tests/test_zhihu_content_processing.py
+++ /dev/null
@@ -1,58 +0,0 @@
-import sys
-import os
-
-# Import content_processing directly to avoid pulling in the full zhihu scraper
-# which has heavy dependencies (fastfetchbot_shared, httpx, etc.)
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "apps", "api", "src", "services", "scrapers", "zhihu"))
-from content_processing import (
- fix_images_and_links,
- extract_references,
- unmask_zhihu_links,
-)
-
-
-def test_fix_images_replaces_data_actualsrc():
- html = '
'
- result = fix_images_and_links(html)
- assert 'src="https://real.jpg"' in result
- assert "data-actualsrc" not in result
-
-
-def test_fix_images_preserves_normal_src():
- html = '
'
- result = fix_images_and_links(html)
- assert 'src="https://normal.jpg"' in result
-
-
-def test_fix_images_removes_u_tags():
- html = "
Hello world
" - result = fix_images_and_links(html) - assert "" not in result - assert "world" in result - - -def test_extract_references_with_refs(): - html = 'Text[1]
' - result = extract_references(html) - assert "参考" in result - assert "Ref 1" in result - assert "https://example.com" in result - - -def test_extract_references_empty(): - html = "No references here
" - result = extract_references(html) - assert result == "" - - -def test_unmask_zhihu_links(): - html = 'link' - result = unmask_zhihu_links(html) - assert "https://example.com" in result - assert "link.zhihu.com" not in result - - -def test_unmask_preserves_normal_links(): - html = 'link' - result = unmask_zhihu_links(html) - assert 'href="https://example.com"' in result diff --git a/packages/shared/tests/__init__.py b/tests/unit/__init__.py similarity index 100% rename from packages/shared/tests/__init__.py rename to tests/unit/__init__.py diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 0000000..ed23746 --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,97 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from fastfetchbot_shared.models.url_metadata import UrlMetadata + + +@pytest.fixture +def make_url_metadata(): + """Factory fixture to create UrlMetadata instances.""" + + def _make(source="twitter", url="https://example.com", content_type=""): + return UrlMetadata(url=url, source=source, content_type=content_type) + + return _make + + +@pytest.fixture +def sample_metadata_item_dict(): + """Minimal valid metadata_item dict.""" + return { + "url": "https://example.com/post/1", + "telegraph_url": "", + "content": "Test content
", + "text": "Test content", + "media_files": [], + "author": "testuser", + "title": "Test Title", + "author_url": "https://example.com/testuser", + "category": "twitter", + "message_type": "short", + } + + +@pytest.fixture(autouse=True) +def reset_scraper_manager(): + """Reset ScraperManager class-level state after each test.""" + yield + from fastfetchbot_shared.services.scrapers.scraper_manager import ScraperManager + + ScraperManager.bluesky_scraper = None + ScraperManager.weibo_scraper = None + ScraperManager.general_scraper = None + ScraperManager.scrapers = { + "bluesky": None, + "weibo": None, + "other": None, + "unknown": None, + } + + +@pytest.fixture +def mock_jinja2_env(): + """Patch JINJA2_ENV to return a mock template.""" + mock_template = MagicMock() + mock_template.render.return_value = "rendered
" + mock_env = MagicMock() + mock_env.get_template.return_value = mock_template + with patch( + "fastfetchbot_shared.services.scrapers.config.JINJA2_ENV", mock_env + ) as m: + yield m + + +@pytest.fixture +def mock_get_response_json(): + """Patch network.get_response_json.""" + with patch( + "fastfetchbot_shared.utils.network.get_response_json", new_callable=AsyncMock + ) as m: + yield m + + +@pytest.fixture +def mock_get_selector(): + """Patch network.get_selector.""" + with patch( + "fastfetchbot_shared.utils.network.get_selector", new_callable=AsyncMock + ) as m: + yield m + + +@pytest.fixture +def mock_get_response(): + """Patch network.get_response.""" + with patch( + "fastfetchbot_shared.utils.network.get_response", new_callable=AsyncMock + ) as m: + yield m + + +@pytest.fixture +def mock_get_redirect_url(): + """Patch network.get_redirect_url.""" + with patch( + "fastfetchbot_shared.utils.network.get_redirect_url", new_callable=AsyncMock + ) as m: + yield m diff --git a/tests/routers/__init__.py b/tests/unit/scrapers/__init__.py similarity index 100% rename from tests/routers/__init__.py rename to tests/unit/scrapers/__init__.py diff --git a/tests/unit/scrapers/test_bluesky.py b/tests/unit/scrapers/test_bluesky.py new file mode 100644 index 0000000..a5cbf1c --- /dev/null +++ b/tests/unit/scrapers/test_bluesky.py @@ -0,0 +1,656 @@ +"""Unit tests for bluesky scraper: Bluesky dataclass, BlueskyPost, BlueskyDataProcessor, BlueskyScraper.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch, PropertyMock +from dataclasses import dataclass + +from fastfetchbot_shared.models.metadata_item import MediaFile, MessageType + + +# --------------------------------------------------------------------------- +# Helpers – lightweight fakes for atproto types +# --------------------------------------------------------------------------- + +def _make_author(handle="alice.bsky.social", display_name="Alice", did="did:plc:abc123"): + author = MagicMock() + author.handle = handle + author.display_name = display_name + author.did = did + return author + + +def _make_post_view( + uri="at://did:plc:abc123/app.bsky.feed.post/rkey123", + text="Hello world", + author=None, + embed=None, + created_at="2024-01-01T00:00:00Z", +): + if author is None: + author = _make_author() + post = MagicMock() + post.uri = uri + post.author = author + post.record = MagicMock() + post.record.text = text + post.record.created_at = created_at + post.embed = embed + return post + + +def _make_thread(post=None, parent=None, replies=None): + thread = MagicMock() + thread.post = post or _make_post_view() + thread.parent = parent + thread.replies = replies + return thread + + +# --------------------------------------------------------------------------- +# Bluesky dataclass tests (bluesky/__init__.py) +# --------------------------------------------------------------------------- + +class TestBlueskyDataclass: + + def test_from_dict_basic(self): + """from_dict should populate cid/author_did from the dict.""" + obj = { + "url": "https://bsky.app/profile/alice/post/123", + "telegraph_url": "", + "content": "hi
", + "text": "hi", + "media_files": [], + "author": "Alice", + "title": "Alice's Bluesky post", + "author_url": "https://bsky.app/profile/alice", + "category": "bluesky", + "message_type": "short", + "cid": "cidvalue", + "author_did": "did:plc:abc", + } + from fastfetchbot_shared.services.scrapers.bluesky import Bluesky + + item = Bluesky.from_dict(obj) + assert item.cid == "cidvalue" + assert item.author_did == "did:plc:abc" + assert item.url == "https://bsky.app/profile/alice/post/123" + assert item.author == "Alice" + + def test_to_dict_without_retweet(self): + from fastfetchbot_shared.services.scrapers.bluesky import Bluesky + + item = Bluesky( + url="https://bsky.app/profile/alice/post/123", + telegraph_url="", + content="hi
", + text="hi", + media_files=[], + author="Alice", + title="Alice's Bluesky post", + author_url="https://bsky.app/profile/alice", + category="bluesky", + message_type=MessageType.SHORT, + cid="cidvalue", + author_did="did:plc:abc", + retweet_post=None, + ) + d = item.to_dict() + assert d["cid"] == "cidvalue" + assert d["author_did"] == "did:plc:abc" + assert "retweet_post" not in d + + def test_to_dict_with_retweet(self): + from fastfetchbot_shared.services.scrapers.bluesky import Bluesky + + retweet = Bluesky( + url="https://bsky.app/profile/bob/post/456", + telegraph_url="", + content="retweet
", + text="retweet", + media_files=[], + author="Bob", + title="Bob's Bluesky post", + author_url="https://bsky.app/profile/bob", + category="bluesky", + message_type=MessageType.SHORT, + cid="cid2", + author_did="did:plc:bob", + retweet_post=None, + ) + item = Bluesky( + url="https://bsky.app/profile/alice/post/123", + telegraph_url="", + content="hi
", + text="hi", + media_files=[], + author="Alice", + title="Alice's Bluesky post", + author_url="https://bsky.app/profile/alice", + category="bluesky", + message_type=MessageType.SHORT, + cid="cid1", + author_did="did:plc:alice", + retweet_post=retweet, + ) + d = item.to_dict() + assert "retweet_post" in d + assert d["retweet_post"]["cid"] == "cid2" + + +# --------------------------------------------------------------------------- +# Bluesky config tests +# --------------------------------------------------------------------------- + +class TestBlueskyConfig: + + def test_constants(self): + from fastfetchbot_shared.services.scrapers.bluesky.config import ( + BLUESKY_HOST, + BLUESKY_MAX_LENGTH, + ) + + assert BLUESKY_HOST == "https://bsky.app" + assert BLUESKY_MAX_LENGTH == 800 + + +# --------------------------------------------------------------------------- +# BlueskyPost tests +# --------------------------------------------------------------------------- + +class TestBlueskyPost: + + @patch("fastfetchbot_shared.services.scrapers.bluesky.scraper.BlueskyScraper") + def test_init_parses_url(self, mock_scraper_cls): + """BlueskyPost should parse handle, post_rkey, and resolve DID.""" + mock_resolver = MagicMock() + mock_resolver.handle.resolve.return_value = "did:plc:resolved" + mock_scraper_cls.id_resolver = mock_resolver + + # Patch at class level before import + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.BlueskyScraper.id_resolver", + mock_resolver, + ): + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyPost + + post = BlueskyPost("https://bsky.app/profile/alice.bsky.social/post/rkey123") + assert post.handle == "alice.bsky.social" + assert post.post_rkey == "rkey123" + assert post.bluesky_host == "bsky.app" + assert post.did == "did:plc:resolved" + + +# --------------------------------------------------------------------------- +# BlueskyDataProcessor tests +# --------------------------------------------------------------------------- + +class TestBlueskyDataProcessor: + + @pytest.fixture(autouse=True) + def _patch_templates(self): + mock_tpl = MagicMock() + mock_tpl.render.return_value = "rendered
" + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.telegram_text_template", + mock_tpl, + ), patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.content_template", + mock_tpl, + ): + self.mock_tpl = mock_tpl + yield + + @pytest.fixture + def _patch_at_uri(self): + mock_at_uri = MagicMock() + mock_at_uri.rkey = "rkey123" + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.AtUri" + ) as at_uri_cls: + at_uri_cls.from_str.return_value = mock_at_uri + yield at_uri_cls + + @pytest.mark.asyncio + async def test_get_item_short_text(self, _patch_at_uri): + """get_item should return dict with SHORT message_type for short text.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + post = _make_post_view(text="short") + thread = _make_thread(post=post, parent=None, replies=None) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + + assert isinstance(result, dict) + assert result["category"] == "bluesky" + assert result["message_type"] == "short" + + @pytest.mark.asyncio + async def test_get_item_long_text(self, _patch_at_uri): + """Text longer than BLUESKY_MAX_LENGTH should set LONG message type.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + # The rendered template returns "rendered
" which is short, + # but we need the combined text to exceed 800 chars. + # We mock template to return long text. + self.mock_tpl.render.return_value = "x" * 900 + + post = _make_post_view(text="x" * 900) + thread = _make_thread(post=post, parent=None, replies=None) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + + assert result["message_type"] == "long" + + @pytest.mark.asyncio + async def test_resolve_thread_with_parent(self, _patch_at_uri): + """Parent posts should be collected recursively.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + grandparent_post = _make_post_view(text="grandparent") + grandparent_thread = _make_thread(post=grandparent_post, parent=None, replies=None) + + parent_post = _make_post_view(text="parent") + parent_thread = _make_thread(post=parent_post, parent=grandparent_thread, replies=None) + + base_post = _make_post_view(text="base") + thread = _make_thread(post=base_post, parent=parent_thread, replies=None) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + assert isinstance(result, dict) + + @pytest.mark.asyncio + async def test_resolve_thread_with_replies_same_author(self, _patch_at_uri): + """Replies by the same author should be included in the combined text.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + author = _make_author(did="did:plc:abc123") + base_post = _make_post_view(text="base", author=author) + + reply_post = _make_post_view(text="reply", author=author) + reply_thread = _make_thread(post=reply_post) + + thread = _make_thread(post=base_post, parent=None, replies=[reply_thread]) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + assert isinstance(result, dict) + + @pytest.mark.asyncio + async def test_resolve_thread_with_replies_different_author(self, _patch_at_uri): + """Replies by a different author should be excluded.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + base_author = _make_author(did="did:plc:abc123") + other_author = _make_author(did="did:plc:other") + + base_post = _make_post_view(text="base", author=base_author) + reply_post = _make_post_view(text="other reply", author=other_author) + reply_thread = _make_thread(post=reply_post) + + thread = _make_thread(post=base_post, parent=None, replies=[reply_thread]) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + assert isinstance(result, dict) + + @pytest.mark.asyncio + async def test_resolve_single_post_with_images(self, _patch_at_uri): + """Posts with image embeds should have media_files populated.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + image_mock = MagicMock() + image_mock.fullsize = "https://cdn.bsky.app/img/feed/abc/image.jpg" + + # Use a simple namespace object instead of MagicMock to avoid __dict__ conflicts + class FakeEmbed: + def __init__(self): + self.images = [image_mock] + self.record = None + + embed = FakeEmbed() + + post = _make_post_view(text="photo post", embed=embed) + thread = _make_thread(post=post, parent=None, replies=None) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + assert len(result["media_files"]) == 1 + assert result["media_files"][0]["media_type"] == "image" + + @pytest.mark.asyncio + async def test_resolve_single_post_with_retweet(self, _patch_at_uri): + """Posts with embed.record as ViewRecord should resolve retweet.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + from atproto_client.models.app.bsky.embed.record import ViewRecord + + # Use a simple namespace to avoid MagicMock __dict__ issues + class FakeEmbed: + def __init__(self): + self.images = [] + self.record = ViewRecord # identity check: `is ViewRecord` + + embed = FakeEmbed() + + post = _make_post_view(text="check this out", embed=embed) + thread = _make_thread(post=post, parent=None, replies=None) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + # Mock _resolve_single_post_data entirely to avoid calling into ViewRecord as PostView + call_count = 0 + + async def side_effect(post_data): + nonlocal call_count + call_count += 1 + if call_count == 1: + return { + "url": "https://bsky.app/profile/alice/post/rkey123", + "title": "Alice's Bluesky post", + "author": "Alice", + "author_url": "https://bsky.app/profile/alice", + "text": "check this out", + "category": "bluesky", + "media_files": [], + "created_at": "2024-01-01T00:00:00Z", + "author_did": "did:plc:abc123", + "content": "rendered
", + "retweet_post": { + "url": "https://bsky.app/profile/bob/post/456", + "title": "Bob's post", + "author": "Bob", + "author_url": "https://bsky.app/profile/bob", + "text": "original post", + "category": "bluesky", + "media_files": [], + "author_did": "did:plc:bob", + "content": "original
", + }, + } + return { + "url": "https://bsky.app/profile/bob/post/456", + "title": "Bob's post", + "author": "Bob", + "author_url": "https://bsky.app/profile/bob", + "text": "original post", + "category": "bluesky", + "media_files": [], + "author_did": "did:plc:bob", + "content": "original
", + } + + with patch.object( + BlueskyDataProcessor, + "_resolve_single_post_data", + side_effect=side_effect, + ): + result = await processor.get_item() + assert isinstance(result, dict) + + @pytest.mark.asyncio + async def test_resolve_single_post_retweet_branch_executed(self, _patch_at_uri): + """Directly test _resolve_single_post_data with embed.record is ViewRecord to cover lines 141-142.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + from atproto_client.models.app.bsky.embed.record import ViewRecord + + class FakeEmbed: + def __init__(self): + self.images = [] + self.record = ViewRecord # `is ViewRecord` will be True + + embed = FakeEmbed() + post = _make_post_view(text="quoting post", embed=embed) + + # Mock the recursive call to _resolve_single_post_data for the retweet + original_method = BlueskyDataProcessor._resolve_single_post_data + call_count = 0 + + async def patched_resolve(post_data): + nonlocal call_count + call_count += 1 + if call_count > 1: + # This is the recursive call for the retweet record + return { + "url": "https://bsky.app/profile/bob/post/456", + "title": "Bob's post", + "author": "Bob", + "author_url": "https://bsky.app/profile/bob", + "text": "retweeted content", + "category": "bluesky", + "media_files": [], + "author_did": "did:plc:bob", + "content": "retweeted
", + "created_at": "2024-01-01", + } + return await original_method(post_data) + + with patch.object( + BlueskyDataProcessor, + "_resolve_single_post_data", + side_effect=patched_resolve, + ): + result = await BlueskyDataProcessor._resolve_single_post_data(post) + assert "retweet_post" in result + assert result["retweet_post"]["author"] == "Bob" + + @pytest.mark.asyncio + async def test_resolve_single_post_no_embed(self, _patch_at_uri): + """Post without embed should have empty media_files.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + post = _make_post_view(text="text only", embed=None) + thread = _make_thread(post=post, parent=None, replies=None) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + assert result["media_files"] == [] + + @pytest.mark.asyncio + async def test_empty_parent_posts_data_list(self, _patch_at_uri): + """When parent exists but parent_posts_data is empty after collection, no text is prepended.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + # Create parent with a post + parent_post = _make_post_view(text="parent text") + parent_thread = _make_thread(post=parent_post, parent=None, replies=None) + + base_post = _make_post_view(text="base text") + thread = _make_thread(post=base_post, parent=parent_thread, replies=None) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + assert isinstance(result, dict) + + @pytest.mark.asyncio + async def test_empty_replies_posts_data_list(self, _patch_at_uri): + """When replies exist but none match author, replies data is empty.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyDataProcessor + + base_author = _make_author(did="did:plc:abc123") + other_author = _make_author(did="did:plc:other") + + base_post = _make_post_view(text="base", author=base_author) + reply_post = _make_post_view(text="different author reply", author=other_author) + reply_thread = _make_thread(post=reply_post) + + thread = _make_thread(post=base_post, parent=None, replies=[reply_thread]) + + processor = BlueskyDataProcessor("https://bsky.app/profile/alice/post/rkey123", thread) + result = await processor.get_item() + assert isinstance(result, dict) + + +# --------------------------------------------------------------------------- +# BlueskyScraper tests +# --------------------------------------------------------------------------- + +class TestBlueskyScraper: + + @pytest.fixture(autouse=True) + def _patch_deps(self): + """Patch atproto classes and templates at module level.""" + mock_tpl = MagicMock() + mock_tpl.render.return_value = "rendered
" + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.telegram_text_template", + mock_tpl, + ), patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.content_template", + mock_tpl, + ): + yield + + @pytest.mark.asyncio + async def test_init_with_credentials(self): + """init() should call client.login when username and password are provided.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyScraper + + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.AsyncClient" + ) as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value = mock_client + + scraper = BlueskyScraper(username="user", password="pass") + await scraper.init() + mock_client.login.assert_awaited_once_with("user", "pass") + + @pytest.mark.asyncio + async def test_init_without_credentials(self): + """init() should not call login when credentials are missing.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyScraper + + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.AsyncClient" + ) as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value = mock_client + + scraper = BlueskyScraper() + await scraper.init() + mock_client.login.assert_not_awaited() + + @pytest.mark.asyncio + async def test_get_processor_by_url(self): + """get_processor_by_url should return a BlueskyDataProcessor.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import ( + BlueskyScraper, + BlueskyDataProcessor, + ) + + mock_resolver = MagicMock() + mock_resolver.handle.resolve.return_value = "did:plc:resolved" + + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.AsyncClient" + ) as mock_client_cls, patch.object( + BlueskyScraper, "id_resolver", mock_resolver + ): + mock_client = AsyncMock() + mock_post_data = MagicMock() + mock_post_data.uri = "at://did:plc:resolved/app.bsky.feed.post/rkey123" + mock_client.get_post.return_value = mock_post_data + + mock_thread_data = MagicMock() + mock_thread_data.thread = _make_thread() + mock_client.get_post_thread.return_value = mock_thread_data + + mock_client_cls.return_value = mock_client + + scraper = BlueskyScraper() + processor = await scraper.get_processor_by_url( + "https://bsky.app/profile/alice.bsky.social/post/rkey123" + ) + assert isinstance(processor, BlueskyDataProcessor) + + @pytest.mark.asyncio + async def test_request_post_data_uses_did_when_available(self): + """_request_post_data should use did as profile_identify when available.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyScraper + + mock_resolver = MagicMock() + mock_resolver.handle.resolve.return_value = "did:plc:resolved" + + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.AsyncClient" + ) as mock_client_cls, patch.object( + BlueskyScraper, "id_resolver", mock_resolver + ): + mock_client = AsyncMock() + mock_post_data = MagicMock() + mock_post_data.uri = "at://did:plc:resolved/app.bsky.feed.post/rkey123" + mock_client.get_post.return_value = mock_post_data + + mock_thread_response = MagicMock() + mock_thread_response.thread = _make_thread() + mock_client.get_post_thread.return_value = mock_thread_response + + mock_client_cls.return_value = mock_client + + scraper = BlueskyScraper() + + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyPost + + bluesky_post = MagicMock(spec=BlueskyPost) + bluesky_post.did = "did:plc:resolved" + bluesky_post.handle = "alice.bsky.social" + bluesky_post.post_rkey = "rkey123" + + result = await scraper._request_post_data(bluesky_post) + mock_client.get_post.assert_awaited_once_with( + profile_identify="did:plc:resolved", post_rkey="rkey123" + ) + + @pytest.mark.asyncio + async def test_request_post_data_uses_handle_when_no_did(self): + """_request_post_data should fall back to handle when did is empty.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyScraper + + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.AsyncClient" + ) as mock_client_cls: + mock_client = AsyncMock() + mock_post_data = MagicMock() + mock_post_data.uri = "at://did:plc:resolved/app.bsky.feed.post/rkey123" + mock_client.get_post.return_value = mock_post_data + + mock_thread_response = MagicMock() + mock_thread_response.thread = _make_thread() + mock_client.get_post_thread.return_value = mock_thread_response + + mock_client_cls.return_value = mock_client + + scraper = BlueskyScraper() + + bluesky_post = MagicMock() + bluesky_post.did = "" # falsy + bluesky_post.handle = "alice.bsky.social" + bluesky_post.post_rkey = "rkey123" + + result = await scraper._request_post_data(bluesky_post) + mock_client.get_post.assert_awaited_once_with( + profile_identify="alice.bsky.social", post_rkey="rkey123" + ) + + @pytest.mark.asyncio + async def test_request_post_data_exception_handling(self): + """_request_post_data should log error and return None on exception.""" + from fastfetchbot_shared.services.scrapers.bluesky.scraper import BlueskyScraper + + with patch( + "fastfetchbot_shared.services.scrapers.bluesky.scraper.AsyncClient" + ) as mock_client_cls: + mock_client = AsyncMock() + mock_client.get_post.side_effect = Exception("network error") + mock_client_cls.return_value = mock_client + + scraper = BlueskyScraper() + + bluesky_post = MagicMock() + bluesky_post.did = "did:plc:abc" + bluesky_post.handle = "alice" + bluesky_post.post_rkey = "rkey123" + + result = await scraper._request_post_data(bluesky_post) + assert result is None diff --git a/tests/unit/scrapers/test_common.py b/tests/unit/scrapers/test_common.py new file mode 100644 index 0000000..decfb26 --- /dev/null +++ b/tests/unit/scrapers/test_common.py @@ -0,0 +1,217 @@ +"""Tests for packages/shared/fastfetchbot_shared/services/scrapers/common.py""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from fastfetchbot_shared.models.url_metadata import UrlMetadata +from fastfetchbot_shared.services.scrapers.common import InfoExtractService + + +# --------------------------------------------------------------------------- +# __init__ +# --------------------------------------------------------------------------- + +class TestInfoExtractServiceInit: + def test_init_sets_all_fields(self, make_url_metadata): + url_metadata = make_url_metadata( + source="twitter", + url="https://twitter.com/user/status/123", + content_type="post", + ) + svc = InfoExtractService( + url_metadata=url_metadata, + data={"key": "val"}, + store_database=True, + store_telegraph=False, + store_document=True, + extra_kwarg="extra", + ) + assert svc.url == "https://twitter.com/user/status/123" + assert svc.content_type == "post" + assert svc.source == "twitter" + assert svc.data == {"key": "val"} + assert svc.store_database is True + assert svc.store_telegraph is False + assert svc.store_document is True + assert svc.kwargs == {"extra_kwarg": "extra"} + + def test_init_defaults(self, make_url_metadata): + url_metadata = make_url_metadata() + svc = InfoExtractService(url_metadata=url_metadata) + assert svc.data is None + assert svc.store_database is False + assert svc.store_telegraph is True + assert svc.store_document is False + assert svc.kwargs == {} + + +# --------------------------------------------------------------------------- +# category property +# --------------------------------------------------------------------------- + +class TestCategory: + def test_category_returns_source(self, make_url_metadata): + url_metadata = make_url_metadata(source="reddit") + svc = InfoExtractService(url_metadata=url_metadata) + assert svc.category == "reddit" + + +# --------------------------------------------------------------------------- +# get_item with pre-existing metadata_item (skips scraping) +# --------------------------------------------------------------------------- + +class TestGetItemWithExistingMetadata: + @pytest.mark.asyncio + async def test_get_item_with_metadata_skips_scraping( + self, make_url_metadata, sample_metadata_item_dict + ): + svc = InfoExtractService(url_metadata=make_url_metadata()) + result = await svc.get_item(metadata_item=sample_metadata_item_dict) + assert result["title"] == "Test Title" + + @pytest.mark.asyncio + async def test_get_item_with_metadata_strips_title(self, make_url_metadata): + svc = InfoExtractService(url_metadata=make_url_metadata()) + item = {"title": " padded title ", "url": "https://example.com"} + result = await svc.get_item(metadata_item=item) + assert result["title"] == "padded title" + + +# --------------------------------------------------------------------------- +# get_item with category in service_classes (e.g. "twitter") +# --------------------------------------------------------------------------- + +class TestGetItemServiceClasses: + @pytest.mark.asyncio + async def test_get_item_twitter_category(self, make_url_metadata): + mock_scraper_instance = MagicMock() + mock_scraper_instance.get_item = AsyncMock( + return_value={"title": " Twitter Post ", "content": "hello"} + ) + mock_scraper_class = MagicMock(return_value=mock_scraper_instance) + + svc = InfoExtractService( + url_metadata=make_url_metadata(source="twitter", url="https://twitter.com/x/1"), + data={"some": "data"}, + ) + + with patch.dict(svc.service_classes, {"twitter": mock_scraper_class}): + result = await svc.get_item() + + mock_scraper_class.assert_called_once_with( + url="https://twitter.com/x/1", data={"some": "data"} + ) + mock_scraper_instance.get_item.assert_awaited_once() + assert result["title"] == "Twitter Post" + + @pytest.mark.asyncio + async def test_get_item_zhihu_category(self, make_url_metadata): + mock_scraper_instance = MagicMock() + mock_scraper_instance.get_item = AsyncMock( + return_value={"title": "Zhihu Answer", "content": "answer"} + ) + mock_scraper_class = MagicMock(return_value=mock_scraper_instance) + + svc = InfoExtractService( + url_metadata=make_url_metadata(source="zhihu"), + ) + + with patch.dict(svc.service_classes, {"zhihu": mock_scraper_class}): + result = await svc.get_item() + + assert result["title"] == "Zhihu Answer" + + +# --------------------------------------------------------------------------- +# get_item with ScraperManager categories +# --------------------------------------------------------------------------- + +class TestGetItemScraperManager: + @pytest.mark.asyncio + @pytest.mark.parametrize("category", ["bluesky", "weibo", "other", "unknown"]) + async def test_get_item_scraper_manager_categories( + self, make_url_metadata, category + ): + mock_processor = MagicMock() + mock_processor.get_item = AsyncMock( + return_value={"title": f" {category} item "} + ) + + mock_scraper = MagicMock() + mock_scraper.get_processor_by_url = AsyncMock(return_value=mock_processor) + + with patch( + "fastfetchbot_shared.services.scrapers.common.ScraperManager" + ) as MockSM: + MockSM.init_scraper = AsyncMock() + MockSM.scrapers = {category: mock_scraper} + + svc = InfoExtractService( + url_metadata=make_url_metadata( + source=category, url="https://example.com/post" + ), + ) + result = await svc.get_item() + + MockSM.init_scraper.assert_awaited_once_with(category) + mock_scraper.get_processor_by_url.assert_awaited_once_with( + url="https://example.com/post" + ) + mock_processor.get_item.assert_awaited_once() + assert result["title"] == f"{category} item" + + +# --------------------------------------------------------------------------- +# get_item exception re-raise +# --------------------------------------------------------------------------- + +class TestGetItemException: + @pytest.mark.asyncio + async def test_get_item_exception_reraises(self, make_url_metadata): + mock_scraper_instance = MagicMock() + mock_scraper_instance.get_item = AsyncMock( + side_effect=RuntimeError("scraper failed") + ) + mock_scraper_class = MagicMock(return_value=mock_scraper_instance) + + svc = InfoExtractService( + url_metadata=make_url_metadata(source="twitter"), + ) + + with patch.dict(svc.service_classes, {"twitter": mock_scraper_class}): + with pytest.raises(RuntimeError, match="scraper failed"): + await svc.get_item() + + @pytest.mark.asyncio + async def test_get_item_scraper_manager_exception_reraises(self, make_url_metadata): + with patch( + "fastfetchbot_shared.services.scrapers.common.ScraperManager" + ) as MockSM: + MockSM.init_scraper = AsyncMock( + side_effect=ValueError("init failed") + ) + + svc = InfoExtractService( + url_metadata=make_url_metadata(source="bluesky"), + ) + with pytest.raises(ValueError, match="init failed"): + await svc.get_item() + + +# --------------------------------------------------------------------------- +# process_item +# --------------------------------------------------------------------------- + +class TestProcessItem: + @pytest.mark.asyncio + async def test_process_item_strips_title(self, make_url_metadata): + svc = InfoExtractService(url_metadata=make_url_metadata()) + result = await svc.process_item({"title": " hello world "}) + assert result["title"] == "hello world" + + @pytest.mark.asyncio + async def test_process_item_no_strip_needed(self, make_url_metadata): + svc = InfoExtractService(url_metadata=make_url_metadata()) + result = await svc.process_item({"title": "clean"}) + assert result["title"] == "clean" diff --git a/tests/unit/scrapers/test_douban.py b/tests/unit/scrapers/test_douban.py new file mode 100644 index 0000000..0af01ec --- /dev/null +++ b/tests/unit/scrapers/test_douban.py @@ -0,0 +1,662 @@ +"""Unit tests for douban scraper: DoubanType enum, Douban class with all methods.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch, call +from lxml import etree + +from fastfetchbot_shared.models.metadata_item import MessageType + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_selector_with_xpaths(xpath_map: dict): + """Create a mock lxml selector that responds to xpath() calls.""" + selector = MagicMock() + + def xpath_side_effect(expr): + for key, val in xpath_map.items(): + if key in expr: + return val + return "" + + selector.xpath = MagicMock(side_effect=xpath_side_effect) + return selector + + +def _make_html_element(html_str: str): + """Create a real lxml element from HTML string for tostring calls.""" + tree = etree.HTML(html_str) + return tree + + +@pytest.fixture(autouse=True) +def _patch_douban_templates(): + mock_tpl = MagicMock() + mock_tpl.render.return_value = "rendered
" + with patch( + "fastfetchbot_shared.services.scrapers.douban.short_text_template", mock_tpl + ), patch( + "fastfetchbot_shared.services.scrapers.douban.content_template", mock_tpl + ): + yield mock_tpl + + +@pytest.fixture +def _patch_get_selector(): + with patch( + "fastfetchbot_shared.services.scrapers.douban.get_selector", + new_callable=AsyncMock, + ) as m: + yield m + + +# --------------------------------------------------------------------------- +# DoubanType enum tests +# --------------------------------------------------------------------------- + +class TestDoubanType: + + def test_enum_values(self): + from fastfetchbot_shared.services.scrapers.douban import DoubanType + + assert DoubanType.MOVIE_REVIEW == "movie_review" + assert DoubanType.BOOK_REVIEW == "book_review" + assert DoubanType.NOTE == "note" + assert DoubanType.STATUS == "status" + assert DoubanType.GROUP == "group" + assert DoubanType.UNKNOWN == "unknown" + + def test_enum_is_string(self): + from fastfetchbot_shared.services.scrapers.douban import DoubanType + + assert isinstance(DoubanType.MOVIE_REVIEW, str) + + +# --------------------------------------------------------------------------- +# Douban.__init__ tests +# --------------------------------------------------------------------------- + +class TestDoubanInit: + + def test_default_fields(self): + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://www.douban.com/note/12345/") + assert d.url == "https://www.douban.com/note/12345/" + assert d.title == "" + assert d.author == "" + assert d.author_url == "" + assert d.text == "" + assert d.content == "" + assert d.media_files == [] + assert d.category == "douban" + assert d.message_type == MessageType.SHORT + assert d.item_title is None + assert d.item_url is None + assert d.group_name is None + assert d.group_url is None + assert d.douban_type == DoubanType.UNKNOWN + assert d.text_group is None + assert d.raw_content is None + assert d.date is None + + def test_cookie_passed_to_headers(self): + from fastfetchbot_shared.services.scrapers.douban import Douban + + d = Douban("https://www.douban.com/note/12345/", cookie="session=abc") + assert d.headers["Cookie"] == "session=abc" + + def test_no_cookie(self): + from fastfetchbot_shared.services.scrapers.douban import Douban + + d = Douban("https://www.douban.com/note/12345/") + assert d.headers["Cookie"] == "" + + +# --------------------------------------------------------------------------- +# check_douban_type tests +# --------------------------------------------------------------------------- + +class TestCheckDoubanType: + + def test_note_type(self): + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://www.douban.com/note/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.NOTE + + def test_status_type_with_status_path(self): + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://www.douban.com/status/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.STATUS + + def test_status_type_with_people_status_path(self): + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://www.douban.com/people/12345/status/67890") + d.check_douban_type() + assert d.douban_type == DoubanType.STATUS + + def test_group_type(self): + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://www.douban.com/group/topic/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.GROUP + + def test_movie_review_direct(self): + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://movie.douban.com/review/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.MOVIE_REVIEW + + def test_book_review_direct(self): + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://book.douban.com/review/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.BOOK_REVIEW + + def test_m_douban_movie_review(self): + """m.douban.com with /movie/review path should map to MOVIE_REVIEW.""" + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://m.douban.com/movie/review/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.MOVIE_REVIEW + # URL should be rewritten to desktop domain + assert "movie.douban.com" in d.url + assert "/review/12345/" in d.url + + def test_m_douban_book_review(self): + """m.douban.com with /book/review path should map to BOOK_REVIEW.""" + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://m.douban.com/book/review/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.BOOK_REVIEW + assert "book.douban.com" in d.url + + def test_m_douban_note(self): + """m.douban.com with /note/ path should map to NOTE.""" + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://m.douban.com/note/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.NOTE + + def test_unknown_type(self): + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://www.douban.com/people/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.UNKNOWN + + def test_url_rewritten(self): + """URL should be rewritten to https://{host}{path} format.""" + from fastfetchbot_shared.services.scrapers.douban import Douban + + d = Douban("https://www.douban.com/note/12345/?query=1") + d.check_douban_type() + assert d.url == "https://www.douban.com/note/12345/" + + def test_m_douban_non_review(self): + """m.douban.com with non-review path should still rewrite host.""" + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + d = Douban("https://m.douban.com/group/topic/12345/") + d.check_douban_type() + assert d.douban_type == DoubanType.GROUP + assert "douban.com" in d.url + + +# --------------------------------------------------------------------------- +# get_douban_item tests +# --------------------------------------------------------------------------- + +class TestGetDoubanItem: + + @pytest.mark.asyncio + async def test_get_item_returns_dict(self, _patch_get_selector, _patch_douban_templates): + from fastfetchbot_shared.services.scrapers.douban import Douban + + # Build a real lxml tree for xpath calls + html = """ + +Content here
Content here
Short
Short
", + ), patch( + "fastfetchbot_shared.services.scrapers.douban.get_html_text_length", + return_value=5, + ): + d = Douban("https://www.douban.com/note/12345/") + await d.get_douban() + assert d.message_type == MessageType.SHORT + + @pytest.mark.asyncio + async def test_short_text_ending_with_newline_stripped(self, _patch_get_selector, _patch_douban_templates): + """If short_text ends with newline, it should be stripped.""" + from fastfetchbot_shared.services.scrapers.douban import Douban, DoubanType + + html = """ + +Content
Note body text
" not in d.raw_content + assert "" not in d.raw_content + + +# --------------------------------------------------------------------------- +# _get_douban_group_article tests +# --------------------------------------------------------------------------- + +class TestGetDoubanGroupArticle: + + @pytest.mark.asyncio + async def test_group_article_fields(self, _patch_get_selector): + from fastfetchbot_shared.services.scrapers.douban import Douban + + html = """ + +
Group article body

Text
inner text
" not in result
+ assert "" not in result
+ assert " Text Para 1 Para 2 Para 3 Hello World Just a single line Content body hello cleaned raw html cleaned html tags
+ assert " " in data["content"]
+
+ @pytest.mark.asyncio
+ @patch(
+ "fastfetchbot_shared.services.scrapers.general.base.BaseGeneralDataProcessor.parsing_article_body_by_llm",
+ new_callable=AsyncMock,
+ return_value=" c " + "x" * 1000 + " {long_text} {long_text} Hello Hello text text keep keep para html html extracted raw extracted raw raw raw raw ok ok ok short hello hello abcdefghij abcdefghij ab " + "x" * 100 + " abc html " + "x" * 1000 + " " + "x" * 1000 + " html h " + "a" * 500 + " " + "a" * 500 + " " + "x" * 1000 + " ab raw " + "a" * 500 + " " + "a" * 500 + " " + "a" * 500 + " " + "a" * 500 + " " + "a" * 500 + " " + "a" * 500 + " " + "a" * 500 + " " + "a" * 500 + " html Full article content here. Full article content here. Content Full content Full content body body body body body html b b b b
\nPara 2
\nPara 3")
+ assert "
\n World ")
+ assert "
\\n should be a single paragraph."""
+ from fastfetchbot_shared.services.scrapers.douban import Douban
+
+ result = Douban.raw_content_to_html("Just a single line")
+ assert result == "Full Flow Note
+
+ s
h
linkbold"
+ "strongitalicemunderline"
+ "
"
+ "quote
"
+ "code
"
+ "
"
+ )
+ result = BaseGeneralDataProcessor.sanitize_html(html)
+ for tag in ["p", "h1", "a", "b", "strong", "i", "em", "u",
+ "ul", "ol", "li", "blockquote", "pre", "code",
+ "img", "br", "table", "thead", "tbody", "tr", "th", "td"]:
+ assert f"<{tag}" in result
+
+
+# ---------------------------------------------------------------------------
+# parsing_article_body_by_llm
+# ---------------------------------------------------------------------------
+
+
+class TestParsingArticleBodyByLlm:
+ @pytest.mark.asyncio
+ async def test_empty_input(self):
+ from fastfetchbot_shared.services.scrapers.general.base import BaseGeneralDataProcessor
+ result = await BaseGeneralDataProcessor.parsing_article_body_by_llm("")
+ assert result == ""
+
+ @pytest.mark.asyncio
+ async def test_none_input(self):
+ from fastfetchbot_shared.services.scrapers.general.base import BaseGeneralDataProcessor
+ result = await BaseGeneralDataProcessor.parsing_article_body_by_llm(None)
+ assert result is None
+
+ @pytest.mark.asyncio
+ @patch("fastfetchbot_shared.services.scrapers.general.base.OPENAI_API_KEY", None)
+ async def test_no_api_key(self):
+ from fastfetchbot_shared.services.scrapers.general.base import BaseGeneralDataProcessor
+ result = await BaseGeneralDataProcessor.parsing_article_body_by_llm("h d alert(1)",
+ "media_files": [],
+ "content": "",
+ "status": True,
+ }
+ )
+ assert "