diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e8b3bc..e089376 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,8 @@ on: push: branches: - main + tags: + - 'v*' concurrency: group: fastfetchbot @@ -24,6 +26,9 @@ jobs: - service: telegram-bot dockerfile: apps/telegram-bot/Dockerfile image_suffix: tgbot + - service: worker + dockerfile: apps/worker/Dockerfile + image_suffix: worker steps: - name: Checkout uses: actions/checkout@v4 @@ -33,13 +38,29 @@ jobs: - name: Check commit message id: check_message run: | - MESSAGE=$(git log --format=%B -n 1 ${{ github.sha }}) + MESSAGE=$(git log --format=%B -n 1 "$GITHUB_SHA") if [[ "$MESSAGE" == *"[github-action]"* ]]; then echo "skip=true" >> "$GITHUB_OUTPUT" else echo "skip=false" >> "$GITHUB_OUTPUT" fi + - name: Determine Environment Tags + id: env_vars + run: | + # Check if the workflow was triggered by a tag or a branch push + if [[ "$GITHUB_REF" == refs/tags/* ]]; then + # Production Environment (Tag Trigger) + VERSION_TAG=${GITHUB_REF#refs/tags/} + echo "docker_tag=latest" >> "$GITHUB_OUTPUT" + echo "version_tag=$VERSION_TAG" >> "$GITHUB_OUTPUT" + else + # Staging Environment (Main Branch Trigger) + echo "docker_tag=stage" >> "$GITHUB_OUTPUT" + # Use the short commit SHA as a secondary tag for tracking + echo "version_tag=$(git rev-parse --short HEAD)" >> "$GITHUB_OUTPUT" + fi + - name: Set up QEMU uses: docker/setup-qemu-action@v3 @@ -67,12 +88,24 @@ jobs: build-args: | APP_VERSION=${{ env.APP_VERSION }} tags: | - ghcr.io/${{ github.repository_owner }}/fastfetchbot-${{ matrix.image_suffix }}:latest + ghcr.io/${{ github.repository_owner }}/fastfetchbot-${{ matrix.image_suffix }}:${{ steps.env_vars.outputs.docker_tag }} + ghcr.io/${{ github.repository_owner }}/fastfetchbot-${{ matrix.image_suffix }}:${{ steps.env_vars.outputs.version_tag }} - deploy: - needs: build - runs-on: ubuntu-latest - steps: - - name: Trigger Watchtower deployment - run: | - curl -H "Authorization: Bearer ${{ secrets.WATCHTOWER_TOKEN }}" ${{ secrets.WATCHTOWER_WEBHOOK_URL }} +# deploy: +# needs: build +# runs-on: ubuntu-latest +# steps: +# - name: Trigger Watchtower deployment +# run: | +# # Route the webhook to the appropriate server based on the trigger +# if [[ "$GITHUB_REF" == refs/tags/* ]]; then +# echo "Deploying to Production..." +# TOKEN="${{ secrets.PROD_WATCHTOWER_TOKEN }}" +# WEBHOOK_URL="${{ secrets.PROD_WATCHTOWER_WEBHOOK_URL }}" +# else +# echo "Deploying to Staging..." +# TOKEN="${{ secrets.STAGE_WATCHTOWER_TOKEN }}" +# WEBHOOK_URL="${{ secrets.STAGE_WATCHTOWER_WEBHOOK_URL }}" +# fi +# +# curl -H "Authorization: Bearer $TOKEN" "$WEBHOOK_URL" \ No newline at end of file diff --git a/.gitignore b/.gitignore index d9fa219..e9d466b 100644 --- a/.gitignore +++ b/.gitignore @@ -258,3 +258,4 @@ conf/* /.run/ .DS_Store /.claude/ +/apps/worker/conf/ diff --git a/CLAUDE.md b/CLAUDE.md index 207372d..09eb0e0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,143 +2,162 @@ ## Project Overview -FastFetchBot is a social media content fetching API built with FastAPI, designed to scrape and archive content from various social media platforms. It includes a Telegram Bot as the default client interface and supports multiple social media platforms including Twitter, Weibo, Xiaohongshu, Reddit, Bluesky, Instagram, Zhihu, Douban, YouTube, and Bilibili. +FastFetchBot is a social media content fetching service built as a **UV workspace monorepo** with three microservices: a FastAPI server (API), a Telegram Bot client, and a Celery worker for file operations. It scrapes and archives content from various social media platforms including Twitter, Weibo, Xiaohongshu, Reddit, Bluesky, Instagram, Zhihu, Douban, YouTube, and Bilibili. + +## Architecture + +``` +FastFetchBot/ +├── packages/shared/ # fastfetchbot-shared: common models, utilities, logger +├── packages/file-export/ # fastfetchbot-file-export: video download, PDF export, transcription +├── apps/api/ # FastAPI server: scrapers, storage, routing +├── apps/telegram-bot/ # Telegram Bot: webhook/polling, message handling +├── apps/worker/ # Celery worker: async file operations (video, PDF, audio) +├── app/ # Legacy re-export wrappers (backward compatibility) +├── pyproject.toml # Root workspace configuration +└── uv.lock # Lockfile for the entire workspace +``` + +| Service | Package Name | Port | Entry Point | +|---------|-------------|------|-------------| +| **API Server** (`apps/api/src/`) | `fastfetchbot-api` | 10450 | `gunicorn -k uvicorn.workers.UvicornWorker src.main:app --preload` | +| **Telegram Bot** (`apps/telegram-bot/core/`) | `fastfetchbot-telegram-bot` | 10451 | `python -m core.main` | +| **Worker** (`apps/worker/worker_core/`) | `fastfetchbot-worker` | — | `celery -A worker_core.main:app worker --loglevel=info --concurrency=2` | +| **Shared Library** (`packages/shared/fastfetchbot_shared/`) | `fastfetchbot-shared` | — | — | +| **File Export Library** (`packages/file-export/fastfetchbot_file_export/`) | `fastfetchbot-file-export` | — | — | + +The Telegram Bot communicates with the API server over HTTP (`API_SERVER_URL`). In Docker, this is `http://api:10450`. + +### API Server (`apps/api/src/`) + +- **`main.py`** — FastAPI app setup, Sentry integration, lifecycle management +- **`config.py`** — Environment variable handling, platform credentials +- **`routers/`** — `scraper.py` (generic endpoint), `scraper_routers.py` (platform-specific), `inoreader.py`, `wechat.py` +- **`services/scrapers/`** — `scraper_manager.py` orchestrates platform scrapers (twitter, weibo, bluesky, xiaohongshu, reddit, instagram, zhihu, douban, threads, wechat, general) +- **`services/file_export/`** — PDF generation, audio transcription (OpenAI), video download +- **`services/amazon/s3.py`** — S3 storage integration +- **`services/telegraph/`** — Telegraph content publishing +- **`templates/`** — Jinja2 templates for platform-specific output formatting + +### Telegram Bot (`apps/telegram-bot/core/`) + +- **`main.py`** — Entry point +- **`api_client.py`** — HTTP client calling the API server +- **`handlers/`** — `messages.py`, `buttons.py`, `url_process.py` +- **`services/`** — `bot_app.py`, `message_sender.py`, `constants.py` +- **`webhook/server.py`** — Webhook/polling server +- **`templates/`** — Jinja2 templates for bot messages + +### Shared Library (`packages/shared/fastfetchbot_shared/`) + +- **`config.py`** — URL patterns (SOCIAL_MEDIA_WEBSITE_PATTERNS, VIDEO_WEBSITE_PATTERNS, BANNED_PATTERNS) +- **`models/`** — `classes.py` (NamedBytesIO), `metadata_item.py`, `telegraph_item.py`, `url_metadata.py` +- **`utils/`** — `parse.py` (URL parsing, HTML processing, `get_env_bool`), `image.py`, `logger.py`, `network.py` + +### Legacy `app/` Directory + +Re-export wrappers providing backward compatibility. Actual code lives in `apps/api/src/` and `packages/shared/`. For example, `app/config.py` imports `get_env_bool` from `fastfetchbot_shared.utils.parse`. ## Development Commands ### Package Management -- `uv sync` - Install all dependencies (including dev) -- `uv sync --no-dev` - Install production dependencies only -- `uv sync --extra windows` - Install with Windows extras -- `uv lock` - Regenerate the lock file after pyproject.toml changes +- `uv sync` — Install all dependencies (including dev) +- `uv lock` — Regenerate the lock file after pyproject.toml changes -### Running the Application -- **Production**: `uv run gunicorn -k uvicorn.workers.UvicornWorker app.main:app --preload` -- **Development**: `uv run gunicorn -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:10450 wsgi:app` +### Running Locally -### Docker Commands -- `docker-compose up -d` - Start all services (FastFetchBot, Telegram Bot API, File Exporter) -- `docker-compose build` - Build the FastFetchBot container +```bash +# API server +cd apps/api +uv run gunicorn -k uvicorn.workers.UvicornWorker src.main:app --preload -> **uv version in Docker**: The Dockerfile pins uv to `0.8.18` via `COPY --from=ghcr.io/astral-sh/uv:0.8.18`. -> To upgrade, update that tag in `Dockerfile` line 24 and verify the build with `docker build -t fastfetchbot-test .`. +# Telegram Bot (separate terminal) +cd apps/telegram-bot +uv run python -m core.main +``` ### Testing -- `uv run pytest` - Run all tests -- `uv run pytest tests/test_bluesky.py` - Run specific test file -- `uv run pytest -v` - Run tests with verbose output +- `uv run pytest` — Run all tests +- `uv run pytest tests/test_bluesky.py` — Run specific test file +- `uv run pytest -v` — Verbose output ### Code Formatting -- `uv run black .` - Format all Python code using Black formatter - -## Architecture Overview - -### Core Components - -**FastAPI Application (`app/main.py`)** -- Main application entry point with FastAPI instance -- Configures routers, middleware, and lifecycle management -- Integrates Sentry for error monitoring -- Handles Telegram bot webhook setup on startup - -**Scraper Architecture (`app/services/scrapers/`)** -- `ScraperManager`: Centralized manager for all platform scrapers -- Individual scraper modules for each platform (twitter, weibo, bluesky, etc.) -- Each scraper implements platform-specific content extraction logic -- Common scraping utilities in `common.py` - -**Router Structure (`app/routers/`)** -- Platform-specific routers (twitter.py, weibo.py, etc.) -- Generic scraper router for unified API endpoints -- Telegram bot webhook handler -- Feed processing and Inoreader integration - -**Data Models (`app/models/`)** -- `classes.py`: Core data structures (NamedBytesIO) -- `database_model.py`: MongoDB/Beanie models -- Platform-specific metadata models -- Telegram chat and Telegraph item models - -**Configuration (`app/config.py`)** -- Comprehensive environment variable handling -- Platform-specific API credentials and cookies -- Database, storage, and service configurations -- Template and localization settings - -### Key Services - -**Telegram Bot Service (`app/services/telegram_bot/`)** -- Handles webhook setup and message processing -- Integrates with local Telegram Bot API server for large file support -- Channel and admin management - -**File Export Service (`app/services/file_export/`)** -- Document export (PDF generation) -- Audio transcription (OpenAI integration) -- Video download capabilities - -**Storage Services** -- Amazon S3 integration for media storage -- Local file system management -- Telegraph integration for content publishing - -### Platform Support - -**Supported Social Media Platforms:** -- Twitter (requires ct0 and auth_token cookies) -- Weibo (requires cookies) -- Xiaohongshu (requires a1, webid, websession cookies) -- Bluesky (requires username/password) -- Reddit (requires API credentials) -- Instagram (requires X-RapidAPI key) -- Zhihu (requires cookies in conf/zhihu_cookies.json) -- Douban -- YouTube, Bilibili (video content) +- `uv run black .` — Format all Python code + +### Docker + +```bash +# Start all services (uses pre-built images from GHCR) +docker-compose up -d + +# Build locally +docker build -f apps/api/Dockerfile -t fastfetchbot-api . +docker build -f apps/telegram-bot/Dockerfile -t fastfetchbot-telegram-bot . +docker build -f apps/worker/Dockerfile -t fastfetchbot-worker . +``` + +> **uv version in Docker**: All three Dockerfiles pin uv to `0.10.4` via `COPY --from=ghcr.io/astral-sh/uv:0.10.4`. +> To upgrade, update that tag in `apps/api/Dockerfile`, `apps/telegram-bot/Dockerfile`, and `apps/worker/Dockerfile`. + +Docker Compose services (see `docker-compose.template.yml`): +- **api** — API server (port 10450) +- **telegram-bot** — Telegram Bot (port 10451) +- **telegram-bot-api** — Local Telegram Bot API for large file support (ports 8081-8082) +- **redis** — Message broker and result backend for Celery (port 6379) +- **worker** — Celery worker for file operations (video download, PDF export, audio transcription) ## Environment Configuration -### Required Variables -- `BASE_URL`: Server base URL -- `TELEGRAM_BOT_TOKEN`: Telegram bot token -- `TELEGRAM_CHAT_ID`: Default chat ID for bot +See `template.env` for a complete reference. Key variables: + +### Required +| Variable | Description | +|----------|-------------| +| `BASE_URL` | Public server domain (used for webhook URL construction) | +| `TELEGRAM_BOT_TOKEN` | Bot token from @BotFather | +| `TELEGRAM_CHAT_ID` | Default chat ID for the bot | -### Critical Setup Notes -- Most social media scrapers require authentication cookies/tokens +### Service Communication (Docker) +| Variable | Default | Description | +|----------|---------|-------------| +| `API_SERVER_URL` | `http://localhost:10450` | URL the Telegram Bot uses to call the API. `http://api:10450` in Docker. | +| `TELEGRAM_BOT_CALLBACK_URL` | `http://localhost:10451` | URL the API uses to call the Telegram Bot. `http://telegram-bot:10451` in Docker. | +| `TELEGRAM_BOT_MODE` | `polling` | `polling` (dev) or `webhook` (production with HTTPS) | + +### Platform Credentials +- Most scrapers require authentication cookies/tokens - Use browser extension "Get cookies.txt LOCALLY" to extract cookies - Store Zhihu cookies in `conf/zhihu_cookies.json` -- Template environment file available at `template.env` +- See `template.env` for all platform-specific variables (Twitter, Weibo, Xiaohongshu, Reddit, Instagram, Bluesky, etc.) -### Database Integration -- Optional MongoDB integration (set `DATABASE_ON=true`) -- Uses Beanie ODM for async MongoDB operations -- Database initialization handled in app lifecycle +### Database +- Optional MongoDB integration (`DATABASE_ON=true`) +- Uses Beanie ODM for async operations -### Docker Services -- **fastfetchbot**: Main application container -- **telegram-bot-api**: Local Telegram Bot API for large file support -- **fast-yt-downloader**: Separate service for video downloads +## CI/CD -## Development Guidelines +GitHub Actions (`.github/workflows/ci.yml`) builds and pushes all three images on push to `main`: +- `ghcr.io/aturret/fastfetchbot-api:latest` +- `ghcr.io/aturret/fastfetchbot-tgbot:latest` +- `ghcr.io/aturret/fastfetchbot-worker:latest` -### Cookie Management -- Platform scrapers depend on valid authentication cookies -- Store sensitive cookies in environment variables, never in code -- Test scraper functionality after cookie updates +Deployment is triggered via Watchtower webhook after builds complete. Include `[github-action]` in a commit message to skip the build. -### Adding New Platform Support -1. Create new scraper module in `app/services/scrapers/[platform]/` +## Development Guidelines + +### Adding a New Platform Scraper +1. Create scraper module in `apps/api/src/services/scrapers//` 2. Implement scraper class following existing patterns -3. Add platform-specific router in `app/routers/` -4. Update ScraperManager to include new scraper -5. Add configuration variables in `app/config.py` +3. Add platform-specific router in `apps/api/src/routers/` +4. Register the scraper in `ScraperManager` +5. Add configuration variables in `apps/api/src/config.py` 6. Create tests in `tests/cases/` -### Template System -- Jinja2 templates in `app/templates/` for content formatting -- Platform-specific templates for different output formats -- Supports internationalization via gettext - -### Error Handling and Logging -- Loguru for comprehensive logging -- Sentry integration for production error monitoring -- Platform-specific error handling in scrapers \ No newline at end of file +### Key Conventions +- Shared models and utilities go in `packages/shared/fastfetchbot_shared/` +- API-specific code goes in `apps/api/src/` +- Telegram bot code goes in `apps/telegram-bot/core/` +- The bot communicates with the API only via HTTP — no direct imports of API code +- Jinja2 templates for output formatting, with i18n support via Babel +- Loguru for logging, Sentry for production error monitoring +- Store sensitive cookies/tokens in environment variables, never in code diff --git a/app/auth.py b/app/auth.py deleted file mode 100644 index 815218e..0000000 --- a/app/auth.py +++ /dev/null @@ -1,33 +0,0 @@ -import secrets - -from fastapi import HTTPException, Security, status -from fastapi.security.api_key import APIKeyQuery, APIKeyHeader - -from app.config import API_KEY_NAME, API_KEY, TELEGRAM_BOT_SECRET_TOKEN - -api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False) -telegram_secret_token_query = APIKeyHeader( - name="X-Telegram-Bot-Api-Secret-Token", auto_error=False -) - - -def verify_key(input_key: str, true_key: str): - if api_key_query is None or not secrets.compare_digest(input_key, true_key): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, detail="API Key Invalid" - ) - - -def verify_api_key(api_key_query: str = Security(api_key_query)): - verify_key(api_key_query, API_KEY) - - -def verify_telegram_api_header( - api_header_query: str = Security(telegram_secret_token_query), -): - if api_header_query is None or not secrets.compare_digest( - api_header_query, TELEGRAM_BOT_SECRET_TOKEN - ): - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, detail="API Key Invalid" - ) diff --git a/app/config.py b/app/config.py deleted file mode 100644 index ca0db5d..0000000 --- a/app/config.py +++ /dev/null @@ -1,233 +0,0 @@ -import json -import os -import tempfile - -from jinja2 import Environment, FileSystemLoader -import gettext -import secrets - -from fastfetchbot_shared.utils.parse import get_env_bool - -env = os.environ -current_directory = os.path.dirname(os.path.abspath(__file__)) -conf_dir = os.path.join(current_directory, "..", "conf") - -# FastAPI environment variables -BASE_URL = env.get("BASE_URL", "localhost") -API_KEY_NAME = env.get("API_KEY_NAME", "pwd") -API_KEY = env.get("API_KEY", secrets.token_urlsafe(32)) - -# Filesystem environment variables -TEMP_DIR = env.get("TEMP_DIR", tempfile.gettempdir()) -WORK_DIR = env.get("WORK_DIR", os.getcwd()) -DOWNLOAD_DIR = env.get("DOWNLOAD_DIR", os.path.join(WORK_DIR, "download")) -DEBUG_MODE = get_env_bool(env, "DEBUG_MODE", False) - -# Logging environment variables -LOG_FILE_PATH = env.get("LOG_FILE_PATH", TEMP_DIR) -LOG_LEVEL = env.get("LOG_LEVEL", "DEBUG") - -# MongoDB environment variables -DATABASE_ON = get_env_bool(env, "DATABASE_ON", False) -MONGODB_PORT = int(env.get("MONGODB_PORT", 27017)) or 27017 -MONGODB_HOST = env.get("MONGODB_HOST", "localhost") -MONGODB_URL = env.get("MONGODB_URL", f"mongodb://{MONGODB_HOST}:{MONGODB_PORT}") - -# Telegram bot environment variables -TELEGRAM_BOT_ON = get_env_bool(env, "TELEGRAM_BOT_ON", True) -TELEGRAM_BOT_MODE = env.get("TELEGRAM_BOT_MODE", "polling") -TELEGRAM_BOT_TOKEN = env.get("TELEGRAM_BOT_TOKEN", None) -TELEGRAM_BOT_SECRET_TOKEN = env.get( - "TELEGRAM_BOT_SECRET_TOKEN", secrets.token_urlsafe(32) -) - -TELEGRAM_CHANNEL_ID = [] -telegram_channel_id = env.get("TELEGRAM_CHANNEL_ID", "").split(",") -for single_telegram_channel_id in telegram_channel_id: - if single_telegram_channel_id.startswith("@"): - TELEGRAM_CHANNEL_ID.append(single_telegram_channel_id) - elif single_telegram_channel_id.startswith("-1"): - TELEGRAM_CHANNEL_ID.append(int(single_telegram_channel_id)) -if len(TELEGRAM_CHANNEL_ID) == 0: - TELEGRAM_CHANNEL_ID = None -telebot_debug_channel = env.get("TELEBOT_DEBUG_CHANNEL", "") -if telebot_debug_channel.startswith("@"): - TELEBOT_DEBUG_CHANNEL = telebot_debug_channel -elif telebot_debug_channel.startswith("-1"): - TELEBOT_DEBUG_CHANNEL = int(telebot_debug_channel) -else: - TELEBOT_DEBUG_CHANNEL = None -telegram_channel_admin_list = env.get("TELEGRAM_CHANNEL_ADMIN_LIST", "") -TELEGRAM_CHANNEL_ADMIN_LIST = [ - admin_id for admin_id in telegram_channel_admin_list.split(",") -] -if not TELEGRAM_CHANNEL_ADMIN_LIST: - TELEGRAM_CHANNEL_ADMIN_LIST = None - -TELEGRAM_WEBHOOK_URL = f"https://{BASE_URL}/telegram/bot/webhook" - -TELEBOT_API_SERVER_HOST = env.get("TELEBOT_API_SERVER_HOST", None) -TELEBOT_API_SERVER_PORT = env.get("TELEBOT_API_SERVER_PORT", None) -TELEBOT_API_SERVER = ( - f"http://{TELEBOT_API_SERVER_HOST}:{TELEBOT_API_SERVER_PORT}" + "/bot" - if (TELEBOT_API_SERVER_HOST and TELEBOT_API_SERVER_PORT) - else "https://api.telegram.org/bot" -) -TELEBOT_API_SERVER_FILE = ( - f"http://{TELEBOT_API_SERVER_HOST}:{TELEBOT_API_SERVER_PORT}" + "/file/bot" - if (TELEBOT_API_SERVER_HOST and TELEBOT_API_SERVER_PORT) - else "https://api.telegram.org/file/bot" -) -TELEBOT_LOCAL_FILE_MODE = ( - False if TELEBOT_API_SERVER == "https://api.telegram.org/bot" else True -) -TELEBOT_CONNECT_TIMEOUT = int(env.get("TELEGRAM_CONNECT_TIMEOUT", 15)) or 15 -TELEBOT_READ_TIMEOUT = int(env.get("TELEGRAM_READ_TIMEOUT", 60)) or 60 -TELEBOT_WRITE_TIMEOUT = int(env.get("TELEGRAM_WRITE_TIMEOUT", 60)) or 60 -TELEBOT_MAX_RETRY = int(env.get("TELEGRAM_MAX_RETRY", 5)) or 5 -TELEGRAM_IMAGE_DIMENSION_LIMIT = int(env.get("TELEGRAM_IMAGE_SIZE_LIMIT", 1600)) or 1600 -TELEGRAM_IMAGE_SIZE_LIMIT = ( - int(env.get("TELEGRAM_IMAGE_SIZE_LIMIT", 5242880)) or 5242880 -) -telegram_group_message_ban_list = env.get("TELEGRAM_GROUP_MESSAGE_BAN_LIST", "") -telegram_bot_message_ban_list = env.get("TELEGRAM_BOT_MESSAGE_BAN_LIST", "") - - -def ban_list_resolver(ban_list_string: str) -> list: - ban_list = ban_list_string.split(",") - for item in ban_list: - if item == "social_media": - ban_list.extend( - [ - "weibo", - "twitter", - "instagram", - "zhihu", - "douban", - "wechat", - "xiaohongshu", - "reddit", - ] - ) - elif item == "video": - ban_list.extend(["youtube", "bilibili"]) - return ban_list - - -TELEGRAM_GROUP_MESSAGE_BAN_LIST = ban_list_resolver(telegram_group_message_ban_list) -TELEGRAM_BOT_MESSAGE_BAN_LIST = ban_list_resolver(telegram_bot_message_ban_list) -telegraph_token_list = env.get("TELEGRAPH_TOKEN_LIST", "") -TELEGRAPH_TOKEN_LIST = telegraph_token_list.split(",") if telegraph_token_list else None - -# Youtube-dl environment variables -FILE_EXPORTER_ON = get_env_bool(env, "FILE_EXPORTER_ON", True) -FILE_EXPORTER_HOST = env.get("FILE_EXPORTER_HOST", "fast-yt-downloader") -FILE_EXPORTER_PORT = env.get("FILE_EXPORTER_PORT", "4000") -FILE_EXPORTER_URL = f"http://{FILE_EXPORTER_HOST}:{FILE_EXPORTER_PORT}" -DOWNLOAD_VIDEO_TIMEOUT = env.get("DOWNLOAD_VIDEO_TIMEOUT", 600) - -# Services environment variables -templates_directory = os.path.join(current_directory, "templates") -JINJA2_ENV = Environment( - loader=FileSystemLoader(templates_directory), lstrip_blocks=True, trim_blocks=True -) -TEMPLATE_LANGUAGE = env.get( - "TEMPLATE_LANGUAGE", "zh_CN" -) # It is a workaround for translation system - -# X-RapidAPI (for instagram) -X_RAPIDAPI_KEY = env.get("X_RAPIDAPI_KEY", None) - -# Twitter -TWITTER_EMAIL = env.get("TWITTER_EMAIL", None) -TWITTER_PASSWORD = env.get("TWITTER_PASSWORD", None) -TWITTER_USERNAME = env.get("TWITTER_USERNAME", None) -TWITTER_CT0 = env.get("TWITTER_CT0", None) -TWITTER_AUTH_TOKEN = env.get("TWITTER_AUTH_TOKEN", None) -TWITTER_COOKIES = { - "ct0": TWITTER_CT0, - "auth_token": TWITTER_AUTH_TOKEN, -} - -# Bluesky -BLUESKY_USERNAME = env.get("BLUESKY_USERNAME", None) -BLUESKY_PASSWORD = env.get("BLUESKY_PASSWORD", None) - -# Weibo -WEIBO_COOKIES = env.get("WEIBO_COOKIES", None) - -# Xiaohongshu -XIAOHONGSHU_A1 = env.get("XIAOHONGSHU_A1", None) -XIAOHONGSHU_WEBID = env.get("XIAOHONGSHU_WEBID", None) -XIAOHONGSHU_WEBSESSION = env.get("XIAOHONGSHU_WEBSESSION", None) -XIAOHONGSHU_COOKIES = { - "a1": XIAOHONGSHU_A1, - "web_id": XIAOHONGSHU_WEBID, - "web_session": XIAOHONGSHU_WEBSESSION, -} -XHS_PHONE_LIST = env.get("XHS_PHONE_LIST", "").split(",") -XHS_IP_PROXY_LIST = env.get("XHS_IP_PROXY_LIST", "").split(",") -XHS_ENABLE_IP_PROXY = get_env_bool(env, "XHS_ENABLE_IP_PROXY", False) -XHS_SAVE_LOGIN_STATE = get_env_bool(env, "XHS_SAVE_LOGIN_STATE", True) - -# Zhihu -FXZHIHU_HOST = env.get("FXZHIHU_HOST", "fxzhihu.com") - -zhihu_cookie_path = os.path.join(conf_dir, "zhihu_cookies.json") -if os.path.exists(zhihu_cookie_path): - try: - with open(zhihu_cookie_path, "r") as f: - ZHIHU_COOKIES_JSON = json.load(f) - except json.JSONDecodeError: - print("Error: The file is not in a valid JSON format.") - ZHIHU_COOKIES_JSON = None - except FileNotFoundError: - print("Error: The file does not exist.") - ZHIHU_COOKIES_JSON = None -else: - print("Error: We cannot find it.") - ZHIHU_COOKIES_JSON = None - -# Reddit -REDDIT_CLIENT_ID = env.get("REDDIT_CLIENT_ID", None) -REDDIT_CLIENT_SECRET = env.get("REDDIT_CLIENT_SECRET", None) -REDDIT_PASSWORD = env.get("REDDIT_PASSWORD", None) -REDDIT_USERNAME = env.get("REDDIT_USERNAME", None) - -# AWS storage -AWS_STORAGE_ON = get_env_bool(env, "AWS_STORAGE_ON", False) -AWS_ACCESS_KEY_ID = env.get("AWS_ACCESS_KEY_ID", None) -AWS_SECRET_ACCESS_KEY = env.get("AWS_SECRET_ACCESS_KEY", None) -AWS_S3_BUCKET_NAME = env.get("AWS_S3_BUCKET_NAME", "") -AWS_REGION_NAME = env.get("AWS_REGION_NAME", "") -AWS_DOMAIN_HOST = env.get("AWS_DOMAIN_HOST", None) -if not (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY and AWS_S3_BUCKET_NAME): - AWS_STORAGE_ON = False -INOREADER_APP_ID = env.get("INOREADER_APP_ID", None) -INOREADER_APP_KEY = env.get("INOREADER_APP_KEY", None) -INOREADER_EMAIL = env.get("INOREADER_EMAIL", None) -INOREADER_PASSWORD = env.get("INOREADER_PASSWORD", None) - -# Open AI API -OPENAI_API_KEY = env.get("OPENAI_API_KEY", None) - -# General webpage scraping -GENERAL_SCRAPING_ON = get_env_bool(env, "GENERAL_SCRAPING_ON", False) -GENERAL_SCRAPING_API = env.get("GENERAL_SCRAPING_API", "FIRECRAWL") - -# Firecrawl API -FIRECRAWL_API_URL = os.getenv("FIRECRAWL_API_URL", "") -FIRECRAWL_API_KEY = os.getenv("FIRECRAWL_API_KEY", "") -FIRECRAWL_WAIT_FOR = int(env.get("FIRECRAWL_WAIT_FOR", 3000)) # milliseconds to wait for JS rendering - - -# Zyte API -ZYTE_API_KEY = env.get("ZYTE_API_KEY", None) - -# Locale directories environment variables -localedir = os.path.join(os.path.dirname(__file__), "locale") -translation = gettext.translation("messages", localedir=localedir, fallback=True) -_ = translation.gettext - -# Utils environment variables -HTTP_REQUEST_TIMEOUT = env.get("HTTP_REQUEST_TIMEOUT", 30) \ No newline at end of file diff --git a/app/database.py b/app/database.py deleted file mode 100644 index f78488a..0000000 --- a/app/database.py +++ /dev/null @@ -1,37 +0,0 @@ -from typing import Optional, Union, List - -from motor.motor_asyncio import AsyncIOMotorClient -from beanie import init_beanie, Document, Indexed - -from app.config import MONGODB_URL -from app.models.database_model import document_list -from app.utils.logger import logger - - -async def startup() -> None: - client = AsyncIOMotorClient(MONGODB_URL) - await init_beanie(database=client["telegram_bot"], document_models=document_list) - - -async def shutdown() -> None: - pass - - -async def save_instances(instances: Union[Document, List[Document]], *args) -> None: - if instances is None: - raise TypeError("instances must be a Model or a list of Model") - - if isinstance(instances, Document): - instance_type = type(instances) - await instance_type.insert(instances) - elif isinstance(instances, list): - instance_type = type(instances[0]) - await instance_type.insert_many(instances) - else: - raise TypeError("instances must be a Model or a list of Model") - - for arg in args: - if not isinstance(arg, Document): - raise TypeError("args must be a Model") - instance_type = type(arg) - await instance_type.insert_one(arg) diff --git a/app/main.py b/app/main.py deleted file mode 100644 index 1fae5b2..0000000 --- a/app/main.py +++ /dev/null @@ -1,72 +0,0 @@ -import asyncio - -import sentry_sdk - -from fastapi import FastAPI, Request -from contextlib import asynccontextmanager -from starlette.middleware.base import BaseHTTPMiddleware - -from app import auth, database -from app.routers import telegram_bot, inoreader, scraper_routers, scraper -from app.services import telegram_bot as telegram_bot_service -from app.config import TELEGRAM_BOT_TOKEN, DATABASE_ON -from app.utils.logger import logger - -SENTRY_DSN = "" - -# https://docs.sentry.io/platforms/python/guides/fastapi/ -sentry_sdk.init( - dsn=SENTRY_DSN, - # Set traces_sample_rate to 1.0 to capture 100% - # of transactions for performance monitoring. - # We recommend adjusting this value in production, - traces_sample_rate=1.0, -) - -started = False -lock = asyncio.Lock() - - -@asynccontextmanager -async def lifespan(app: FastAPI): - global started - async with lock: - if not started: - started = True - await telegram_bot_service.set_webhook() - await telegram_bot_service.startup() - if DATABASE_ON: - await database.startup() - try: - yield - finally: - if DATABASE_ON: - await database.shutdown() - await telegram_bot_service.shutdown() - - -class LogMiddleware(BaseHTTPMiddleware): - def __init__(self, app): - super().__init__(app) - - async def dispatch(self, request: Request, call_next): - logger.info(f"{request.method} {request.url}") - response = await call_next(request) - return response - - -def create_app(): - fastapi_app = FastAPI(lifespan=lifespan) - fastapi_app.add_middleware(LogMiddleware) - if TELEGRAM_BOT_TOKEN is not None: - fastapi_app.include_router(telegram_bot.router) - else: - logger.warning("Telegram bot token not set, telegram bot disabled") - fastapi_app.include_router(inoreader.router) - fastapi_app.include_router(scraper.router) - for router in scraper_routers.scraper_routers: - fastapi_app.include_router(router) - return fastapi_app - - -app = create_app() diff --git a/app/models/classes.py b/app/models/classes.py deleted file mode 100644 index e89bcf7..0000000 --- a/app/models/classes.py +++ /dev/null @@ -1,2 +0,0 @@ -# Re-export from shared package -from fastfetchbot_shared.models.classes import NamedBytesIO # noqa: F401 diff --git a/app/models/database_model.py b/app/models/database_model.py deleted file mode 100644 index 7143d8d..0000000 --- a/app/models/database_model.py +++ /dev/null @@ -1,43 +0,0 @@ -from typing import Optional, Any -from datetime import datetime - -from pydantic import BaseModel, Field -from beanie import Document, Indexed, Insert, after_event, before_event - -from app.models.metadata_item import MediaFile, MessageType -from app.models.telegram_chat import document_list as telegram_chat_document_list -from app.utils.logger import logger -from app.utils.parse import get_html_text_length - - -class Metadata(Document): - title: str = Field(default="untitled") - message_type: MessageType = MessageType.SHORT - url: str - author: Optional[str] = None - author_url: Optional[str] = None - text: Optional[str] = None - text_length: Optional[int] = Field(ge=0) - content: Optional[str] = None - content_length: Optional[int] = Field(ge=0) - category: Optional[str] = None - source: Optional[str] = None - media_files: Optional[list[MediaFile]] = None - telegraph_url: Optional[str] = None - timestamp: datetime = Field(default_factory=datetime.utcnow) - scrape_status: bool = False - - @before_event(Insert) - def get_text_length(self): - self.text_length = get_html_text_length(self.text) - self.content_length = get_html_text_length(self.content) - - # - @staticmethod - def from_dict(obj: Any) -> "Metadata": - assert isinstance(obj, dict) - return Metadata(**obj) - - -document_list = [Metadata] -document_list.extend(telegram_chat_document_list) diff --git a/app/models/media_type.py b/app/models/media_type.py deleted file mode 100644 index 8b1c761..0000000 --- a/app/models/media_type.py +++ /dev/null @@ -1,7 +0,0 @@ -from enum import Enum - - -class MediaType(Enum): - PHOTO = 1 - VIDEO = 2 - ANIMATION = 3 diff --git a/app/models/metadata_item.py b/app/models/metadata_item.py deleted file mode 100644 index 5bce9fa..0000000 --- a/app/models/metadata_item.py +++ /dev/null @@ -1,12 +0,0 @@ -# Re-export from shared package -from fastfetchbot_shared.models.metadata_item import * # noqa: F401,F403 -from fastfetchbot_shared.models.metadata_item import ( # noqa: F401 - MetadataItem, - MediaFile, - MessageType, - from_str, - from_list, - to_class, - metadata_item_from_dict, - metadata_item_to_dict, -) diff --git a/app/models/telegram_chat.py b/app/models/telegram_chat.py deleted file mode 100644 index 8261c96..0000000 --- a/app/models/telegram_chat.py +++ /dev/null @@ -1,33 +0,0 @@ -from typing import Optional, Any -from datetime import datetime - -from pydantic import BaseModel, Field -from beanie import Document, Indexed - - -class TelegramUser(BaseModel): - id: int - is_bot: bool - first_name: str - last_name: Optional[str] = None - username: Optional[str] = None - language_code: Optional[str] = None - - -class TelegramChat(BaseModel): - id: int - type: str - title: Optional[str] = None - username: Optional[str] = None - first_name: Optional[str] = None - last_name: Optional[str] = None - - -class TelegramMessage(Document): - date: Indexed(datetime) = Field(default_factory=datetime.utcnow) - chat: TelegramChat - user: TelegramUser - text: str = Field(default="unknown") - - -document_list = [TelegramMessage] diff --git a/app/models/telegraph_item.py b/app/models/telegraph_item.py deleted file mode 100644 index 2b4b2f0..0000000 --- a/app/models/telegraph_item.py +++ /dev/null @@ -1,7 +0,0 @@ -# Re-export from shared package -from fastfetchbot_shared.models.telegraph_item import * # noqa: F401,F403 -from fastfetchbot_shared.models.telegraph_item import ( # noqa: F401 - TelegraphItem, - telegraph_item_from_dict, - telegraph_item_to_dict, -) diff --git a/app/models/url_metadata.py b/app/models/url_metadata.py deleted file mode 100644 index 020d120..0000000 --- a/app/models/url_metadata.py +++ /dev/null @@ -1,7 +0,0 @@ -# Re-export from shared package -from fastfetchbot_shared.models.url_metadata import * # noqa: F401,F403 -from fastfetchbot_shared.models.url_metadata import ( # noqa: F401 - UrlMetadata, - url_metadata_from_dict, - url_metadata_to_dict, -) diff --git a/app/routers/feed_push.py b/app/routers/feed_push.py deleted file mode 100644 index 997fdb1..0000000 --- a/app/routers/feed_push.py +++ /dev/null @@ -1,53 +0,0 @@ -# TODO: this script is now unused, will be removed in the future - -from fastapi import APIRouter -from fastapi.requests import Request - -from app.config import TELEGRAM_CHANNEL_ID -from app.services.telegram_bot import send_item_message -from app.services.scrapers.common import InfoExtractService -from fastapi import Security -from app.auth import verify_api_key -from app.utils.logger import logger -from app.utils.parse import get_url_metadata - -router = APIRouter(prefix="/feedPush") - - -async def get_feed_item(url: str, channel_id: str, **kwargs): - try: - channel_id = int(channel_id) if channel_id.startswith("-") else channel_id - url_metadata = await get_url_metadata(url) - item = InfoExtractService(url_metadata, **kwargs) - metadata_item = await item.get_item() - if channel_id not in TELEGRAM_CHANNEL_ID: - logger.error(f"channel_id {channel_id} not found") - return - await send_item_message(metadata_item, chat_id=channel_id) - except Exception as e: - logger.error(f"Error while getting item: {e}") - - -@router.post("/", dependencies=[Security(verify_api_key)]) -async def push_feed_item( - request: Request, -): - try: - data = await request.json() - params = request.query_params - url = ( - data.get("url") - or data.get("aurl") - or params.get("url") - or params.get("aurl") - ) - if not url: - return f"Error: url is required" - channel_id = data.get("channelId") or params.get("channelId") - if not channel_id: - return f"Error: channelId is required" - kwargs = data.get("kwargs", {}) - await get_feed_item(url, channel_id, **kwargs) - return "ok" - except Exception as e: - return f"Error: {e}" diff --git a/app/routers/inoreader.py b/app/routers/inoreader.py deleted file mode 100644 index dd5d671..0000000 --- a/app/routers/inoreader.py +++ /dev/null @@ -1,38 +0,0 @@ -from fastapi import APIRouter -from fastapi.requests import Request - -from app.config import INOREADER_APP_ID, INOREADER_APP_KEY -from app.services.inoreader import Inoreader -from app.services.inoreader.telegram_process import ( - get_inoreader_item_async, - process_inoreader_data, - default_telegram_channel_id -) -from fastapi import Security -from app.auth import verify_api_key - -router = APIRouter(prefix="/inoreader") - - -async def get_inoreader_webhook_data(data: dict): - result = data["items"] - return result - - -@router.post("/triggerAsync", dependencies=[Security(verify_api_key)]) -async def inoreader_trigger_webhook(request: Request): - if not INOREADER_APP_ID or not INOREADER_APP_KEY: - return "inoreader app id or key not set" - params = request.query_params - await get_inoreader_item_async(trigger=True, params=params) - return "ok" - - -@router.post("/webhook", dependencies=[Security(verify_api_key)]) -async def inoreader_tag_webhook(request: Request): - data = await request.json() - data = await Inoreader.process_items_data(data) - params = request.query_params - telegram_channel_id = params.get("channel_id", default_telegram_channel_id) - await process_inoreader_data(data=data, use_inoreader_content=True, telegram_channel_id=telegram_channel_id) - return "ok" diff --git a/app/routers/scraper.py b/app/routers/scraper.py deleted file mode 100644 index ff1f576..0000000 --- a/app/routers/scraper.py +++ /dev/null @@ -1,37 +0,0 @@ -import asyncio - -from fastapi import APIRouter -from fastapi.requests import Request - -from app.config import API_KEY_NAME -from app.services.scrapers.common import InfoExtractService -from fastapi import Security -from app.auth import verify_api_key -from app.utils.logger import logger -from app.utils.parse import get_url_metadata - -router = APIRouter(prefix="/scraper") - - -@router.post("/getItem", dependencies=[Security(verify_api_key)]) -async def get_item_route(request: Request): - logger.debug("A scraper getItem request received") - query_params = dict(request.query_params) - url = query_params.pop("url") - ban_list = query_params.pop("ban_list", None) - logger.debug(f"get_item_route: url: {url}, query_params: {query_params}") - if API_KEY_NAME in query_params: - query_params.pop(API_KEY_NAME) - url_metadata = await get_url_metadata(url, ban_list) - item = InfoExtractService(url_metadata, **query_params) - result = await item.get_item() - logger.debug(f"getItem result: {result}") - return result - - -@router.post("/getUrlMetadata", dependencies=[Security(verify_api_key)]) -async def get_url_metadata_route(request: Request): - url = request.query_params.get("url") - ban_list = request.query_params.get("ban_list") - url_metadata = await get_url_metadata(url, ban_list) - return url_metadata.to_dict() diff --git a/app/routers/scraper_routers.py b/app/routers/scraper_routers.py deleted file mode 100644 index 66316c7..0000000 --- a/app/routers/scraper_routers.py +++ /dev/null @@ -1,6 +0,0 @@ -from .wechat import router as wechat_router - - -scraper_routers = [ - wechat_router, -] diff --git a/app/routers/telegram_bot.py b/app/routers/telegram_bot.py deleted file mode 100644 index 3a4f9c9..0000000 --- a/app/routers/telegram_bot.py +++ /dev/null @@ -1,34 +0,0 @@ -import asyncio - -from fastapi import APIRouter, HTTPException -from fastapi.requests import Request - -from app.services.telegram_bot import set_webhook, process_telegram_update -from app.config import TELEGRAM_WEBHOOK_URL, TELEGRAM_BOT_SECRET_TOKEN -from app.utils.logger import logger -from fastapi import Security, BackgroundTasks -from app.auth import verify_api_key, verify_telegram_api_header - -router = APIRouter(prefix="/telegram") - - -@router.post("/bot/webhook", dependencies=[Security(verify_telegram_api_header)]) -async def telegram_bot_webhook(request: Request, background_tasks: BackgroundTasks): - logger.debug("A telegram bot webhook received") - data = await request.json() - background_tasks.add_task(process_telegram_update, data) - logger.debug(f"telegram bot webhook data received, background task added: {data}") - return "ok" - - -@router.get("/bot/set_webhook", dependencies=[Security(verify_api_key)]) -async def telegram_bot_set_webhook(): - # mask api key - logger.debug( - f"set telegram webhook: {TELEGRAM_WEBHOOK_URL}\nsecret token: {TELEGRAM_BOT_SECRET_TOKEN[:2]}{'*' * (len(TELEGRAM_BOT_SECRET_TOKEN) - 4)}{TELEGRAM_BOT_SECRET_TOKEN[-2:]}" - ) - if await set_webhook(): - return "ok" - else: - logger.error("set telegram webhook failed") - raise HTTPException(status_code=500, detail="set telegram webhook failed") diff --git a/app/routers/twitter.py b/app/routers/twitter.py deleted file mode 100644 index b15763c..0000000 --- a/app/routers/twitter.py +++ /dev/null @@ -1,20 +0,0 @@ -from fastapi import APIRouter - -from app.services.scrapers.common import InfoExtractService -from fastapi import Security -from app.auth import verify_api_key - - -router = APIRouter(prefix="/twitter") - - -@router.post("/repost", dependencies=[Security(verify_api_key)]) -async def twitter_repost_webhook(url: str): - url_metadata = { - "url": url, - "type": "social_media", - "source": "twitter", - } - item = InfoExtractService(url_metadata) - await item.get_item() - return "ok" diff --git a/app/routers/wechat.py b/app/routers/wechat.py deleted file mode 100644 index fb17df1..0000000 --- a/app/routers/wechat.py +++ /dev/null @@ -1,29 +0,0 @@ -from fastapi import APIRouter -from fastapi.requests import Request - -from app.models.url_metadata import UrlMetadata -from app.services.scrapers.common import InfoExtractService -from fastapi import Security -from app.auth import verify_api_key - -router = APIRouter(prefix="/wechat") - - -@router.post("/gzh", dependencies=[Security(verify_api_key)]) -async def wechat_gzh_scrape(request: Request): - url = request.query_params.get("url") - if url: - url_metadata = UrlMetadata.from_dict({ - "url": url, - "type": "social_media", - "source": "wechat", - }) - else: - customized_url_metadata = request.json() - if customized_url_metadata: - url_metadata = UrlMetadata.from_dict(customized_url_metadata) - else: - return "url or url metadata not found" - item = InfoExtractService(url_metadata) - result = await item.get_item() - return result diff --git a/app/services/amazon/s3.py b/app/services/amazon/s3.py deleted file mode 100644 index 4cd347a..0000000 --- a/app/services/amazon/s3.py +++ /dev/null @@ -1,67 +0,0 @@ -import asyncio -import uuid -from datetime import datetime -from urllib.parse import urlparse, quote - -import aiofiles.os -from pathlib import Path - -import aioboto3 -from botocore.exceptions import ClientError - -from app.utils.logger import logger -from app.utils.network import download_file_to_local -from app.config import AWS_S3_BUCKET_NAME, AWS_REGION_NAME, AWS_DOMAIN_HOST - -session = aioboto3.Session() -image_url_host = ( - AWS_DOMAIN_HOST - if AWS_DOMAIN_HOST - else f"{AWS_S3_BUCKET_NAME}.s3.{AWS_REGION_NAME}.amazonaws.com" -) - - -async def download_and_upload(url: str, referer: str = None, suite: str = "test") -> str: - urlparser = urlparse(url) - file_name = (urlparser.netloc + urlparser.path).replace("/", "-") - local_path = await download_file_to_local(url=url, referer=referer, file_name=file_name) - local_path = Path(local_path) - file_name = local_path.name - if not local_path: - return "" - s3_path = await upload( - suite=suite, - staging_path=local_path, - file_name=file_name, - ) - await aiofiles.os.remove(local_path) - return s3_path - - -async def upload( - staging_path: Path, - bucket: str = AWS_S3_BUCKET_NAME, - suite: str = "test", - release: str = datetime.now().strftime("%Y-%m-%d"), - file_name: str = None, -) -> str: - if not file_name: - file_name = uuid.uuid4().hex - blob_s3_key = f"{suite}/{release}/{file_name}" - async with session.client("s3") as s3: - try: - with staging_path.open("rb") as spfp: - logger.info(f"Uploading {blob_s3_key}") - await s3.upload_fileobj( - spfp, - bucket, - blob_s3_key, - ) - logger.info(f"Uploaded {file_name} to {suite}/{release}") - except Exception as e: - logger.error(f"Failed to upload {file_name} to {suite}/{release}, {e}") - return "" - image_url = f"https://{image_url_host}/{blob_s3_key}" - urlparser = urlparse(image_url) - quoted_url = urlparser.scheme + "://" + urlparser.netloc + quote(urlparser.path) - return quoted_url diff --git a/app/services/file_export/__init__.py b/app/services/file_export/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/app/services/file_export/audio_transcribe/__init__.py b/app/services/file_export/audio_transcribe/__init__.py deleted file mode 100644 index 2290b3e..0000000 --- a/app/services/file_export/audio_transcribe/__init__.py +++ /dev/null @@ -1,30 +0,0 @@ -import httpx - -from app.config import OPENAI_API_KEY, FILE_EXPORTER_URL, DOWNLOAD_VIDEO_TIMEOUT -from app.utils.logger import logger -from app.utils.parse import wrap_text_into_html - -TRANSCRIBE_MODEL = "whisper-1" -SEGMENT_LENGTH = 5 * 60 - - -class AudioTranscribe: - def __init__(self, audio_file: str): - self.audio_file = audio_file - - async def transcribe(self): - return await self._get_audio_text(self.audio_file) - - @staticmethod - async def _get_audio_text(audio_file: str): - async with httpx.AsyncClient() as client: - body = { - "audio_file": audio_file, - "openai_api_key": OPENAI_API_KEY, - } - request_url = FILE_EXPORTER_URL + "/transcribe" - response = await client.post( - url=request_url, json=body, timeout=DOWNLOAD_VIDEO_TIMEOUT - ) - transcript = response.json().get("transcript") - return transcript diff --git a/app/services/file_export/document_export/__init__.py b/app/services/file_export/document_export/__init__.py deleted file mode 100644 index 282167d..0000000 --- a/app/services/file_export/document_export/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from . import pdf_export - - -class DocumentExport(object): - def __init__(self, document): - self.document = document - - def export(self): - if self.document["type"] == "pdf": - return pdf_export.PdfExport(self.document["content"]).export() diff --git a/app/services/file_export/document_export/pdf_export.py b/app/services/file_export/document_export/pdf_export.py deleted file mode 100644 index 22d5df7..0000000 --- a/app/services/file_export/document_export/pdf_export.py +++ /dev/null @@ -1,109 +0,0 @@ -import asyncio -import functools - -# import gc -import os -import uuid -from pathlib import Path - -import aiofiles -import aiofiles.os -import httpx -from bs4 import BeautifulSoup - -from app.config import DOWNLOAD_DIR, FILE_EXPORTER_URL, DOWNLOAD_VIDEO_TIMEOUT, TEMP_DIR, AWS_STORAGE_ON -from app.services.amazon.s3 import upload as upload_to_s3 -from app.utils.logger import logger - -current_directory = os.path.dirname(os.path.abspath(__file__)) - -PDF_STYLESHEET = os.path.join(current_directory, "pdf_export.css") - - -async def upload_file_to_s3(output_filename): - return await upload_to_s3( - staging_path=output_filename, - suite="documents", - file_name=output_filename.name, - ) - - -class PdfExport: - def __init__(self, title: str, html_string: str = None): - self.title = title - self.html_string = html_string - - async def export(self, method: str = "file") -> str: - body = { - "method": method - } - html_string = self.wrap_html_string(self.html_string) - if method == "string": - body["html_string"] = html_string, - logger.debug( - f""" - html_string: {html_string} - """ - ) - elif method == "file": - filename = f"{self.title}-{uuid.uuid4()}.html" - filename = os.path.join(TEMP_DIR, filename) - async with aiofiles.open( - filename, "w", encoding="utf-8" - ) as f: - await f.write(html_string) - html_file = filename - logger.debug(html_file) - body["html_file"] = html_file - output_filename = f"{self.title}-{uuid.uuid4()}.pdf" - body["output_filename"] = output_filename - - async with httpx.AsyncClient() as client: - request_url = FILE_EXPORTER_URL + "/pdfExport" - logger.info(f"requesting pdf export from pdf server: {body}") - resp = await client.post( - request_url, json=body, timeout=DOWNLOAD_VIDEO_TIMEOUT - ) - output_filename = resp.json().get("output_filename") - logger.info(f"pdf export success: {output_filename}") - await aiofiles.os.remove(html_file) - if AWS_STORAGE_ON: - local_filename = output_filename - output_filename = await upload_file_to_s3(Path(output_filename)) - await aiofiles.os.remove(local_filename) - return output_filename - - @staticmethod - def wrap_html_string(html_string: str) -> str: - soup = BeautifulSoup( - '' - '', - "html.parser", - ) - soup.body.append(BeautifulSoup(html_string, "html.parser")) - for tag in soup.find_all(True): - if "style" in tag.attrs: - del tag["style"] - for style_tag in soup.find_all("style"): - style_tag.decompose() - return soup.prettify() - - # @staticmethod - # async def convert_html_to_pdf( - # html_string: str, css_string: str, output_filename: str - # ) -> None: - # font_config = FontConfiguration() - # css_item = CSS(string=css_string, font_config=font_config) - # html_item = HTML(string=html_string) - # loop = asyncio.get_event_loop() - # pdf_obj = await loop.run_in_executor( - # None, - # functools.partial( - # html_item.write_pdf, output_filename, stylesheets=[css_item] - # ), - # ) - # del font_config - # del css_item - # del html_item - # del pdf_obj - # gc.collect() diff --git a/app/services/file_export/document_export/simsun.ttc b/app/services/file_export/document_export/simsun.ttc deleted file mode 100644 index 6ca8de3..0000000 Binary files a/app/services/file_export/document_export/simsun.ttc and /dev/null differ diff --git a/app/services/file_export/video_download/__init__.py b/app/services/file_export/video_download/__init__.py deleted file mode 100644 index ca3bd4e..0000000 --- a/app/services/file_export/video_download/__init__.py +++ /dev/null @@ -1,232 +0,0 @@ -from typing import Any, Optional - -import httpx -from urllib.parse import urlparse, parse_qs - -from app.models.metadata_item import MetadataItem, MessageType, MediaFile -from app.services.file_export.audio_transcribe import AudioTranscribe -from app.config import FILE_EXPORTER_URL, DOWNLOAD_VIDEO_TIMEOUT -from app.utils.parse import unix_timestamp_to_utc, second_to_time, wrap_text_into_html -from app.utils.logger import logger -from app.config import JINJA2_ENV - -video_info_template = JINJA2_ENV.get_template("video_info.jinja2") - - -class VideoDownloader(MetadataItem): - def __init__( - self, - url: str, - category: str, - data: Optional[Any] = None, - download: bool = True, - audio_only: bool = False, - hd: bool = False, - transcribe: bool = False, - **kwargs, - ): - self.extractor = category - self.url = url - self.author_url = "" - self.download = download - self.audio_only = audio_only - self.transcribe = transcribe - self.hd = hd - self.message_type = MessageType.SHORT - self.file_path = None - # metadata variables - self.category = category - self.media_files = [] - # auxiliary variables - self.created = None - self.duration = None - - @classmethod - async def create(cls, *args, **kwargs): - instance = cls(*args, **kwargs) - instance.url = await instance._parse_url(instance.url) - return instance - - async def get_item(self) -> dict: - self.url = await self._parse_url(self.url) - await self.get_video() - return self.to_dict() - - async def get_video(self) -> None: - content_info = await self.get_video_info() - self.file_path = content_info["file_path"] - video_info_funcs = { - "youtube": self._youtube_info_parse, - "bilibili": self._bilibili_info_parse, - } - meta_info = video_info_funcs[self.extractor](content_info) - self._video_info_formatting(meta_info) - # AI transcribe - if self.transcribe: - audio_content_info = await self.get_video_info(audio_only=True) - audio_file_path = audio_content_info["file_path"] - audio_transcribe = AudioTranscribe(audio_file_path) - transcribe_text = await audio_transcribe.transcribe() - if self.download is False: - self.message_type = MessageType.LONG - self.text += "\nAI全文摘录:" + transcribe_text - self.content += "
" + wrap_text_into_html(transcribe_text) - - async def _parse_url(self, url: str) -> str: - async def _get_redirected_url(original_url: str) -> str: - async with httpx.AsyncClient(follow_redirects=False) as client: - resp = await client.get(original_url) - if resp.status_code == 200: - original_url = resp.url - elif resp.status_code == 302: - original_url = resp.headers["Location"] - return original_url - - def _remove_youtube_link_tracing(original_url: str) -> str: - original_url_parser = urlparse(original_url) - original_url_hostname = str(original_url_parser.hostname) - - if "youtu.be" in original_url_hostname: - # remove all queries - original_url = original_url.split("?")[0] - if "youtube.com" in original_url_hostname: - # remove all queries except "?v=" part - original_url = original_url_parser.scheme + "://" + original_url_parser.netloc + original_url_parser.path - if original_url_parser.query: - v_part_query = [item for item in original_url_parser.query.split("&") if "v=" in item] - if v_part_query: - original_url += "?" + v_part_query[0] - return original_url - - def _remove_bilibili_link_tracing(original_url: str) -> str: - original_url_parser = urlparse(original_url) - original_url_hostname = str(original_url_parser.hostname) - query_dict = parse_qs(original_url_parser.query) - bilibili_p_query_string = "?p=" + query_dict["p"][0] if 'p' in query_dict else "" - - if "bilibili.com" in original_url_hostname: - original_url = original_url_parser.scheme + "://" + original_url_parser.netloc + original_url_parser.path - return original_url + bilibili_p_query_string - - logger.info(f"parsing original video url: {url} for {self.extractor}") - - url_parser = urlparse(url) - url_hostname = str(url_parser.hostname) - - if self.extractor == "bilibili": - if "b23.tv" in url_hostname: - url = await _get_redirected_url(url) - if "m.bilibili.com" in url_hostname: - url = url.replace("m.bilibili.com", "www.bilibili.com") - url = _remove_bilibili_link_tracing(url) - elif self.extractor == "youtube": - if "youtu.be" in url_hostname: - url = await _get_redirected_url(url) - url = _remove_youtube_link_tracing(url) - - logger.info(f"parsed video url: {url} for {self.extractor}") - return url - - async def get_video_info( - self, - url: str = None, - download: bool = None, - extractor: str = None, - audio_only: bool = None, - hd: bool = None, - ) -> dict: - """ - make a request to youtube-dl server to get video info - :return: video info dict - """ - if url is None: - url = self.url - if download is None: - download = self.download - if extractor is None: - extractor = self.extractor - if audio_only is None: - audio_only = self.audio_only - if hd is None: - hd = self.hd - async with httpx.AsyncClient() as client: - body = { - "url": url, - "download": download, - "extractor": extractor, - "audio_only": audio_only, - "hd": hd, - } - request_url = FILE_EXPORTER_URL + "/videoDownload" - logger.info(f"requesting video info from youtube-dl server: {body}") - if download is True: - logger.info(f"video downloading... it may take a while") - if hd is True: - logger.info(f"downloading HD video, it may take longer") - elif audio_only is True: - logger.info(f"downloading audio only") - logger.debug(f"downloading video timeout: {DOWNLOAD_VIDEO_TIMEOUT}") - resp = await client.post( - request_url, json=body, timeout=DOWNLOAD_VIDEO_TIMEOUT - ) - content_info = resp.json().get("content_info") - file_path = resp.json().get("file_path") - content_info["file_path"] = file_path - return content_info - - def _video_info_formatting(self, meta_info: dict): - self.title = meta_info["title"] - self.author = meta_info["author"] - self.author_url = meta_info["author_url"] - if len(meta_info["description"]) > 800: - meta_info["description"] = meta_info["description"][:800] + "..." - self.created = meta_info["upload_date"] - self.duration = meta_info["duration"] - self.text = video_info_template.render( - data={ - "url": self.url, - "title": self.title, - "author": self.author, - "author_url": self.author_url, - "duration": self.duration, - "created": self.created, - "playback_data": meta_info["playback_data"], - "description": meta_info["description"], - } - ) - self.content = self.text.replace("\n", "
") - if self.download: - media_type = "video" - if self.audio_only: - media_type = "audio" - self.media_files = [MediaFile(media_type, self.file_path, "")] - - @staticmethod - def _youtube_info_parse(video_info: dict) -> dict: - return { - "id": video_info["id"], - "title": video_info["title"], - "author": video_info["uploader"], - "author_url": video_info["uploader_url"] or video_info["channel_url"], - "description": video_info["description"], - "playback_data": f"视频播放量:{video_info['view_count']} 评论数:{video_info['comment_count']}", - "author_avatar": video_info["thumbnail"], - "upload_date": str(video_info["upload_date"]), - "duration": second_to_time(round(video_info["duration"])), - } - - @staticmethod - def _bilibili_info_parse(video_info: dict) -> dict: - return { - "id": video_info["id"], - "title": video_info["title"], - "author": video_info["uploader"], - "author_url": "https://space.bilibili.com/" - + str(video_info["uploader_id"]), - "author_avatar": video_info["thumbnail"], - "ext": video_info["ext"], - "description": video_info["description"], - "playback_data": f"视频播放量:{video_info['view_count']} 弹幕数:{video_info['comment_count']} 点赞数:{video_info['like_count']}", - "upload_date": unix_timestamp_to_utc(video_info["timestamp"]), - "duration": second_to_time(round(video_info["duration"])), - } diff --git a/app/services/inoreader/__init__.py b/app/services/inoreader/__init__.py deleted file mode 100644 index 55fc3c7..0000000 --- a/app/services/inoreader/__init__.py +++ /dev/null @@ -1,170 +0,0 @@ -from typing import Optional -from urllib.parse import quote - -import httpx -from bs4 import BeautifulSoup -import jmespath -from httpx import Response - -from app.models.metadata_item import MetadataItem, MediaFile, MessageType -from app.utils.network import HEADERS -from app.utils.logger import logger -from app.utils.parse import get_html_text_length -from app.config import ( - INOREADER_APP_ID, - INOREADER_APP_KEY, - INOREADER_EMAIL, - INOREADER_PASSWORD, -) - -INOREADER_CONTENT_URL = "https://www.inoreader.com/reader/api/0/stream/contents/" -TAG_PATH = "user/-/label/" -OTHER_PATH = "user/-/state/com.google/" -INOREADER_LOGIN_URL = "https://www.inoreader.com/accounts/ClientLogin" - - -class Inoreader(MetadataItem): - def __init__(self, url: str = None, data: dict = None, **kwargs): - if url: - self.url = url - if data: - self.title = data.get("title", "") - self.message = data.get("message", "") - self.author = data.get("author", "") - self.author_url = data.get("author_url", "") - self.category = data.get("category", "") - self.raw_content = data.get("content", "") - self.content = self.raw_content - if kwargs.get("category"): - self.category = kwargs["category"] - self.media_files = [] - self.message_type = MessageType.LONG - - def _from_data(self, data: dict): - self.title = data.get("title", "") - self.message = data.get("message", "") - self.author = data.get("author", "") - self.author_url = data.get("author_url", "") - self.category = data.get("category", "") - self.raw_content = data.get("content", "") - self.content = self.raw_content - - async def get_item(self, api: bool = False) -> dict: - if api: - data = await self.get_api_item_data() - self._resolve_media_files() - if get_html_text_length(self.content) < 400: - self.message_type = MessageType.SHORT - metadata_dict = self.to_dict() - metadata_dict["message"] = self.message - return metadata_dict - - def _resolve_media_files(self): - soup = BeautifulSoup(self.raw_content, "html.parser") - for img in soup.find_all("img"): - self.media_files.append(MediaFile(url=img["src"], media_type="image")) - img.extract() - for video in soup.find_all("video"): - self.media_files.append(MediaFile(url=video["src"], media_type="video")) - video.extract() - for tags in soup.find_all(["p", "span"]): - tags.unwrap() - self.text = str(soup) - self.text = '' + self.author + ": " + self.text - - @staticmethod - def get_stream_id( - stream_type: str = "broadcast", tag: str = None, feed: str = None - ) -> str: - if stream_type == "feed": - stream_id = feed - elif stream_type == "tag": - stream_id = TAG_PATH + tag - else: - stream_id = OTHER_PATH + stream_type - stream_id = quote(stream_id) - return stream_id - - @staticmethod - async def mark_all_as_read(stream_id: str, timestamp: int = 0) -> None: - request_url = "https://www.inoreader.com/reader/api/0/mark-all-as-read" - params = {"s": stream_id, "ts": timestamp} - resp = await Inoreader.get_api_info(url=request_url, params=params) - logger.debug(resp.text) - - @staticmethod - async def get_api_item_data( - stream_type: str = "broadcast", - tag: str = None, - feed: str = None, - params: dict = None, - ) -> Optional[dict | list]: - stream_id = Inoreader.get_stream_id(stream_type=stream_type, tag=tag, feed=feed) - request_url = INOREADER_CONTENT_URL + stream_id - default_params = { - "comments": 1, - "n": 10, - "r": "o", - "xt": "user/-/state/com.google/read", - } - if params: - default_params.update(params) - params = default_params - resp = await Inoreader.get_api_info(url=request_url, params=params) - logger.debug(resp.text) - data = resp.json() - data = await Inoreader.process_items_data(data) - return data - - @staticmethod - async def process_items_data(data: dict) -> Optional[dict | list]: - expression = """ - items[].{ - "aurl": canonical[0].href, - "title": title, - "author": origin.title, - "author_url": origin.htmlUrl, - "content": summary.content, - "category": categories[-1], - "message": comments[0].commentBody, - "timestamp": updated - } - """ - data = jmespath.search(expression, data) - for item in data: - item["category"] = item["category"].split("/")[-1] - return data - - @staticmethod - async def get_api_info( - url: str, - params=None, - ) -> Response: - async with httpx.AsyncClient() as client: - resp = await client.post( - INOREADER_LOGIN_URL, - params={ - "Email": INOREADER_EMAIL, - "Passwd": INOREADER_PASSWORD, - }, - ) - authorization = resp.text.split("\n")[2].split("=")[1] - - async with httpx.AsyncClient() as client: - headers = HEADERS - headers["Authorization"] = f"GoogleLogin auth={authorization}" - params = params or {} - params.update( - { - "AppId": INOREADER_APP_ID, - "AppKey": INOREADER_APP_KEY, - } - ) - resp = await client.get( - url=url, - params=params, - headers=headers, - ) - return resp - - diff --git a/app/services/inoreader/telegram_process.py b/app/services/inoreader/telegram_process.py deleted file mode 100644 index 975e894..0000000 --- a/app/services/inoreader/telegram_process.py +++ /dev/null @@ -1,102 +0,0 @@ -from typing import Union, Optional, Dict, Callable, Awaitable - -from app.config import TELEGRAM_CHANNEL_ID -from app.models.url_metadata import UrlMetadata -from app.services.inoreader import Inoreader -from app.services.scrapers.common import InfoExtractService -from app.utils.logger import logger -from app.utils.parse import get_url_metadata, get_bool - -default_telegram_channel_id = TELEGRAM_CHANNEL_ID[0] if TELEGRAM_CHANNEL_ID else None - -# Type alias for the message callback -MessageCallback = Callable[[dict, Union[int, str]], Awaitable[None]] - - -async def _default_message_callback(metadata_item: dict, chat_id: Union[int, str]) -> None: - """Default callback that sends via Telegram bot. Used when no callback is provided.""" - from app.services.telegram_bot import send_item_message - await send_item_message(metadata_item, chat_id=chat_id) - - -async def process_inoreader_data( - data: list, - use_inoreader_content: bool, - telegram_channel_id: Union[int, str] = default_telegram_channel_id, - stream_id: str = None, - message_callback: MessageCallback = None, -): - if message_callback is None: - message_callback = _default_message_callback - - for item in data: - url_type_item = await get_url_metadata(item["aurl"]) - url_type_dict = url_type_item.to_dict() - logger.debug(f"ino original: {use_inoreader_content}") - if ( - use_inoreader_content is True - or url_type_dict["content_type"] == "unknown" - ): - is_video = url_type_dict["content_type"] == "video" - content_type = url_type_dict["content_type"] if is_video else "social_media" - source = url_type_dict["source"] if is_video else "inoreader" - url_metadata = UrlMetadata( - url=item["aurl"], - content_type=content_type, - source=source, - ) - metadata_item = InfoExtractService( - url_metadata=url_metadata, - data=item, - store_document=True, - category=item["category"], - ) - else: - metadata_item = InfoExtractService( - url_metadata=url_type_item, - data=item, - store_document=True, - ) - message_metadata_item = await metadata_item.get_item() - await message_callback(message_metadata_item, telegram_channel_id) - if stream_id: - await Inoreader.mark_all_as_read( - stream_id=stream_id, timestamp=item["timestamp"] - 1 - ) - - -async def get_inoreader_item_async( - data: Optional[Dict] = None, - trigger: bool = False, - params: Optional[Dict] = None, - message_callback: MessageCallback = None, -) -> None: - stream_id = None - use_inoreader_content = True - telegram_channel_id = default_telegram_channel_id - if trigger and params and not data: - logger.debug(f"params:{params}") - use_inoreader_content = get_bool(params.get("useInoreaderContent"), True) - stream_type = params.get("streamType", "broadcast") - telegram_channel_id = params.get("channelId", default_telegram_channel_id) - tag = params.get("tag", None) - feed = params.get("feed", None) - the_remaining_params = { - k: v - for k, v in params.items() - if k not in ["streamType", "channelId", "tag", "feed"] - } - data = await Inoreader.get_api_item_data( - stream_type=stream_type, tag=tag, params=the_remaining_params, feed=feed - ) - if not data: - return - stream_id = Inoreader.get_stream_id(stream_type=stream_type, tag=tag, feed=feed) - if type(data) is dict: - data = [data] - await process_inoreader_data( - data, use_inoreader_content, telegram_channel_id, stream_id, - message_callback=message_callback, - ) - if stream_id: - await Inoreader.mark_all_as_read(stream_id=stream_id) diff --git a/app/services/scrapers/__init__.py b/app/services/scrapers/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/app/services/scrapers/bluesky/__init__.py b/app/services/scrapers/bluesky/__init__.py deleted file mode 100644 index cb11b74..0000000 --- a/app/services/scrapers/bluesky/__init__.py +++ /dev/null @@ -1,45 +0,0 @@ -import traceback -from dataclasses import dataclass -from urllib.parse import urlparse -from typing import Dict, Optional, Any - -import httpx -import jmespath - -from app.models.metadata_item import MetadataItem, MediaFile, MessageType -from app.utils.parse import get_html_text_length, wrap_text_into_html - - -@dataclass -class Bluesky(MetadataItem): - cid: str = "" - author_did: str = "" - retweet_post: Optional["Bluesky"] = None - - @staticmethod - def from_dict(obj: Any) -> "Bluesky": - bluesky_item = MetadataItem.from_dict(obj) - bluesky_item.cid = obj.get("cid") - bluesky_item.author_did = obj.get("author_did") - return Bluesky( - url=bluesky_item.url, - title=bluesky_item.title, - author=bluesky_item.author, - author_url=bluesky_item.author_url, - telegraph_url=bluesky_item.telegraph_url, - text=bluesky_item.text, - content=bluesky_item.content, - media_files=bluesky_item.media_files, - category=bluesky_item.category, - message_type=bluesky_item.message_type, - cid=bluesky_item.cid, - author_did=bluesky_item.author_did, - ) - - def to_dict(self) -> dict: - result: dict = super().to_dict() - result["cid"] = self.cid - result["author_did"] = self.author_did - if self.retweet_post: - result["retweet_post"] = self.retweet_post.to_dict() - return result diff --git a/app/services/scrapers/bluesky/config.py b/app/services/scrapers/bluesky/config.py deleted file mode 100644 index 3183639..0000000 --- a/app/services/scrapers/bluesky/config.py +++ /dev/null @@ -1,3 +0,0 @@ -BLUESKY_HOST = "https://bsky.app" - -BLUESKY_MAX_LENGTH = 800 diff --git a/app/services/scrapers/bluesky/scraper.py b/app/services/scrapers/bluesky/scraper.py deleted file mode 100644 index 06f09d9..0000000 --- a/app/services/scrapers/bluesky/scraper.py +++ /dev/null @@ -1,191 +0,0 @@ -from typing import Optional -from urllib.parse import urlparse - -from atproto import AsyncClient, IdResolver, AtUri -from atproto_client.models.app.bsky.embed.record import ViewRecord -from atproto_client.models.app.bsky.feed.defs import ThreadViewPost, PostView - -from app.config import JINJA2_ENV -from app.models.metadata_item import MediaFile, MessageType -from app.services.scrapers.scraper import Scraper, DataProcessor -from app.services.scrapers.bluesky import Bluesky -from app.services.scrapers.bluesky.config import BLUESKY_HOST, BLUESKY_MAX_LENGTH -from app.utils.logger import logger -from app.utils.parse import wrap_text_into_html - -telegram_text_template = JINJA2_ENV.get_template("bluesky_telegram_text.jinja2") -content_template = JINJA2_ENV.get_template("bluesky_content.jinja2") - - -class BlueskyPost: - def __init__(self, bluesky_url: str): - self.url: str = bluesky_url - bluesky_url_parser = urlparse(bluesky_url) - self.bluesky_host: Optional[str] = bluesky_url_parser.netloc - bluesky_path = bluesky_url_parser.path - self.handle: Optional[str] = bluesky_path.split("/")[2] - self.post_rkey: Optional[str] = bluesky_path.split("/")[-1] - self.did: str = BlueskyScraper.id_resolver.handle.resolve(self.handle) - - -class BlueskyDataProcessor(DataProcessor): - - def __init__(self, url: str, bluesky_thread_data: ThreadViewPost): - self.url: str = url - self.bluesky_thread_data: ThreadViewPost = bluesky_thread_data - logger.debug( - f"BlueskyDataProcessor initialized with url: {url}\n and bluesky_thread_data: \n{bluesky_thread_data}") - self._data: dict = {} - - async def get_item(self) -> dict: - await self.process_data() - bluesky_item = Bluesky.from_dict(self._data) - return bluesky_item.to_dict() - pass - - async def process_data(self): - await self._resolve_thread_data() - - async def _resolve_thread_data(self) -> None: - base_post_view_data = await BlueskyDataProcessor._resolve_single_post_data(self.bluesky_thread_data.post) - base_post_view_data["url"] = self.url - - post_author_did = base_post_view_data["author_did"] - - parent_posts_text = "" - parent_posts_content = "" - parent_posts_media_files = [] - replies_posts_text = "" - replies_posts_content = "" - replies_posts_media_files = [] - # get post data from the parent posts whose author is the same as the base post author - if self.bluesky_thread_data.parent: - parent_posts_data = [] - parent_post_view = self.bluesky_thread_data.parent - await BlueskyDataProcessor._get_parent_posts_data(parent_post_view, parent_posts_data) - if parent_posts_data: - for post_data in parent_posts_data: - parent_posts_text += "\n" + post_data["text"] - parent_posts_content += post_data["content"] - parent_posts_media_files.extend(post_data["media_files"]) - # get post data from the replies whose author is the same as the base post author - if self.bluesky_thread_data.replies: - replies_posts_data = [] - for post_thread_view in self.bluesky_thread_data.replies: - post_view = post_thread_view.post - if post_author_did == post_view.author.did: - post_data = await BlueskyDataProcessor._resolve_single_post_data(post_view) - replies_posts_data.append(post_data) - if replies_posts_data: - for post_data in replies_posts_data: - replies_posts_text += "\n" + post_data["text"] - replies_posts_content += post_data["content"] - replies_posts_media_files.extend(post_data["media_files"]) - base_post_view_data["text"] = parent_posts_text + base_post_view_data["text"] + replies_posts_text - base_post_view_data["content"] = parent_posts_content + base_post_view_data["content"] + replies_posts_content - base_post_view_data["media_files"] = parent_posts_media_files + base_post_view_data[ - "media_files"] + replies_posts_media_files - - if len(base_post_view_data["text"]) > BLUESKY_MAX_LENGTH: - base_post_view_data["message_type"] = MessageType.LONG - else: - base_post_view_data["message_type"] = MessageType.SHORT - - self._data = base_post_view_data - - @staticmethod - async def _get_parent_posts_data(parent_post_view: ThreadViewPost, parent_posts_data_list: list) -> None: - parent_post_data = await BlueskyDataProcessor._resolve_single_post_data(parent_post_view.post) - parent_posts_data_list.append(parent_post_data) - if parent_post_view.parent: - await BlueskyDataProcessor._get_parent_posts_data(parent_post_view.parent, parent_posts_data_list) - - @staticmethod - async def _resolve_single_post_data(post_data: PostView) -> dict: - at_uri = AtUri.from_str(post_data.uri) - url = BLUESKY_HOST + "/profile/" + post_data.author.handle + "/post/" + at_uri.rkey - author = post_data.author.display_name - author_url = BLUESKY_HOST + "/profile/" + post_data.author.handle - author_did = post_data.author.did - text = post_data.record.text - created_at = post_data.record.created_at - - parsed_post_data = { - "url": url, - "title": author + "\'s Bluesky post", - "author": author, - "author_url": author_url, - "text": text, - "category": "bluesky", - "media_files": [], - "created_at": created_at, - "author_did": author_did, - } - - media_files = [] - if post_data.embed is not None: - # images and videos - if "images" in post_data.embed.__dict__: - for image in post_data.embed.images: - img_url = image.fullsize - img_item = { - "media_type": "image", - "url": img_url, - "caption": "", - } - media_files.append(img_item) - # TODO: handle video, which is in m3u8 format that needs to be downloaded and converted to mp4 - parsed_post_data["media_files"] = media_files - # retweet post - if "record" in post_data.embed.__dict__ and post_data.embed.record is ViewRecord: - retweet_post_data = await BlueskyDataProcessor._resolve_single_post_data(post_data.embed.record) - parsed_post_data["retweet_post"] = retweet_post_data - - content = await BlueskyDataProcessor._generate_html_content(parsed_post_data) - text = await BlueskyDataProcessor._generate_telegram_text(parsed_post_data) - parsed_post_data["content"] = content - parsed_post_data["text"] = text - - return parsed_post_data - - @staticmethod - async def _generate_html_content(data: dict) -> str: - html_content_text = wrap_text_into_html(data["text"]) - data["html_content_text"] = html_content_text - content = content_template.render(data=data) - return content - - @staticmethod - async def _generate_telegram_text(data: dict) -> str: - text = telegram_text_template.render(data=data) - return text - - -class BlueskyScraper(Scraper): - id_resolver = IdResolver() - - def __init__(self, username: Optional[str] = None, password: Optional[str] = None): - self.client: AsyncClient = AsyncClient() - self.username: Optional[str] = username - self.password: Optional[str] = password - self.did: Optional[str] = None - - async def init(self): - if self.username and self.password: - await self.client.login(self.username, self.password) - # self.did = await self.client.com - - async def get_processor_by_url(self, url: str) -> BlueskyDataProcessor: - bluesky_post = BlueskyPost(url) - bluesky_post_data = await self._request_post_data(bluesky_post) - return BlueskyDataProcessor(url, bluesky_post_data) - - async def _request_post_data(self, bluesky_post: BlueskyPost) -> ThreadViewPost: - profile_identify = bluesky_post.did or bluesky_post.handle - try: - post_data = await self.client.get_post(profile_identify=profile_identify, post_rkey=bluesky_post.post_rkey) - post_uri = post_data.uri - post_thread_data = await self.client.get_post_thread(uri=post_uri) - return post_thread_data.thread - except Exception as e: - logger.error(f"Error while getting post data: {e}") diff --git a/app/services/scrapers/common.py b/app/services/scrapers/common.py deleted file mode 100644 index bdb1a42..0000000 --- a/app/services/scrapers/common.py +++ /dev/null @@ -1,114 +0,0 @@ -from typing import Optional, Any - -from app.models.database_model import Metadata -from app.models.url_metadata import UrlMetadata -from app.models.metadata_item import MessageType -from app.services import ( - telegraph, - inoreader -) -from app.services.file_export import video_download, document_export -from app.services.scrapers import twitter, wechat, reddit, weibo, zhihu, douban, instagram, xiaohongshu, threads -from app.services.scrapers.scraper_manager import ScraperManager -from app.database import save_instances -from app.utils.logger import logger -from app.config import DATABASE_ON - - -class InfoExtractService(object): - service_classes: dict = { - "twitter": twitter.Twitter, - "threads": threads.Threads, - "reddit": reddit.Reddit, - "weibo": weibo.Weibo, - "wechat": wechat.Wechat, - "instagram": instagram.Instagram, - "douban": douban.Douban, - "zhihu": zhihu.Zhihu, - "xiaohongshu": xiaohongshu.Xiaohongshu, - "youtube": video_download.VideoDownloader, - "bilibili": video_download.VideoDownloader, - "inoreader": inoreader.Inoreader, - } - - def __init__( - self, - url_metadata: UrlMetadata, - data: Any = None, - store_database: Optional[bool] = DATABASE_ON, - store_telegraph: Optional[bool] = True, - store_document: Optional[bool] = False, - **kwargs, - ): - url_metadata = url_metadata.to_dict() - self.url = url_metadata["url"] - self.content_type = url_metadata["content_type"] - self.source = url_metadata["source"] - self.data = data - self.kwargs = kwargs - self.store_database = store_database - self.store_telegraph = store_telegraph - self.store_document = store_document - - @property - def category(self) -> str: - return self.source - - async def get_item(self, metadata_item: Optional[dict] = None) -> dict: - if self.content_type == "video": - if not self.kwargs.get("category"): - self.kwargs["category"] = self.category - if not metadata_item: - try: - if self.category in ["bluesky", "weibo", "other", "unknown"]: # it is a workaround before the code refactor - await ScraperManager.init_scraper(self.category) - item_data_processor = await ScraperManager.scrapers[self.category].get_processor_by_url(url=self.url) - metadata_item = await item_data_processor.get_item() - else: - scraper_item = InfoExtractService.service_classes[self.category]( - url=self.url, data=self.data, **self.kwargs - ) - metadata_item = await scraper_item.get_item() - except Exception as e: - logger.error(f"Error while getting item: {e}") - raise e - logger.info(f"Got metadata item") - logger.debug(metadata_item) - metadata_item = await self.process_item(metadata_item) - return metadata_item - - async def process_item(self, metadata_item: dict) -> dict: - if metadata_item.get("message_type") == MessageType.LONG: - self.store_telegraph = True - logger.info("message type is long, store in telegraph") - if self.store_telegraph: - telegraph_item = telegraph.Telegraph.from_dict(metadata_item) - try: - telegraph_url = await telegraph_item.get_telegraph() - except Exception as e: - logger.error(f"Error while getting telegraph: {e}") - telegraph_url = "" - metadata_item["telegraph_url"] = telegraph_url - if self.store_document or ( - not self.store_document and metadata_item["telegraph_url"] == "" - ): - logger.info("store in document") - try: - pdf_document = document_export.pdf_export.PdfExport( - title=metadata_item["title"], html_string=metadata_item["content"] - ) - output_filename = await pdf_document.export(method="file") - metadata_item["media_files"].append( - { - "media_type": "document", - "url": output_filename, - "caption": "", - } - ) - except Exception as e: - logger.error(f"Error while exporting document: {e}") - metadata_item["title"] = metadata_item["title"].strip() - if self.store_database: - logger.info("store in database") - await save_instances(Metadata.model_construct(**metadata_item)) - return metadata_item diff --git a/app/services/scrapers/douban/__init__.py b/app/services/scrapers/douban/__init__.py deleted file mode 100644 index feec305..0000000 --- a/app/services/scrapers/douban/__init__.py +++ /dev/null @@ -1,230 +0,0 @@ -import re -from typing import Dict, Optional, Any -from enum import Enum -from urllib.parse import urlparse - -from bs4 import BeautifulSoup -from lxml import etree - -from app.utils.parse import get_html_text_length, wrap_text_into_html -from app.utils.network import get_selector, HEADERS -from app.models.metadata_item import MetadataItem, MediaFile, MessageType -from app.config import JINJA2_ENV - -SHORT_LIMIT = 600 - -short_text_template = JINJA2_ENV.get_template("douban_short_text.jinja2") -content_template = JINJA2_ENV.get_template("douban_content.jinja2") - - -class DoubanType(str, Enum): - MOVIE_REVIEW = "movie_review" - BOOK_REVIEW = "book_review" - NOTE = "note" - STATUS = "status" - GROUP = "group" - UNKNOWN = "unknown" - - -class Douban(MetadataItem): - item_title: Optional[str] - item_url: Optional[str] - group_name: Optional[str] - group_url: Optional[str] - douban_type: DoubanType - text_group: Optional[str] - raw_content: Optional[str] - date: Optional[str] - - def __init__(self, url: str, data: Optional[Any] = None, **kwargs): - # metadata fields - self.url = url - self.title = "" - self.author = "" - self.author_url = "" - self.text = "" - self.content = "" - self.media_files = [] - self.category = "douban" - self.message_type = MessageType.SHORT - # auxiliary fields - self.item_title: Optional[str] = None - self.item_url: Optional[str] = None - self.group_name: Optional[str] = None - self.group_url: Optional[str] = None - self.douban_type: DoubanType = DoubanType.UNKNOWN - self.text_group: Optional[str] = None - self.raw_content: Optional[str] = None - self.date: Optional[str] = None - # reqeust fields - self.headers = HEADERS - self.headers["Cookie"] = kwargs.get("cookie", "") - - async def get_item(self) -> dict: - await self.get_douban() - return self.to_dict() - - async def get_douban(self) -> None: - self.check_douban_type() - await self.get_douban_item() - - def check_douban_type(self): - urlparser = urlparse(self.url) - host = urlparser.netloc - path = urlparser.path - if host.find("m.douban") != -1: # parse the m.douban url - host = host.replace("m.douban", "douban") - if path.startswith("/movie/review"): - self.douban_type = DoubanType.MOVIE_REVIEW - host = host.replace("douban", "movie.douban") - path = path.replace("/movie/", "/") - elif path.startswith("/book/review"): - self.douban_type = DoubanType.BOOK_REVIEW - host = host.replace("douban", "book.douban") - path = path.replace("/book/", "/") - if path.startswith("/note/"): - self.douban_type = DoubanType.NOTE - elif path.startswith("/status/") or re.match(r"/people/\d+/status/\d+", path): - self.douban_type = DoubanType.STATUS - elif path.startswith("/group/topic/"): - self.douban_type = DoubanType.GROUP - elif host.startswith("movie.douban") and path.startswith("/review/"): - self.douban_type = DoubanType.MOVIE_REVIEW - elif host.startswith("book.douban") and path.startswith("/review/"): - self.douban_type = DoubanType.BOOK_REVIEW - else: - self.douban_type = DoubanType.UNKNOWN - self.url = f"https://{host}{path}" - - async def get_douban_item(self): - function_dict = { - DoubanType.MOVIE_REVIEW: self._get_douban_movie_review, - DoubanType.BOOK_REVIEW: self._get_douban_book_review, - DoubanType.NOTE: self._get_douban_note, - DoubanType.STATUS: self._get_douban_status, - DoubanType.GROUP: self._get_douban_group_article, - DoubanType.UNKNOWN: None, - } - await function_dict[self.douban_type]() - short_text = self._douban_short_text_process() - if short_text.endswith("\n"): - short_text = short_text[:-1] - data = self.__dict__ - data["short_text"] = short_text - self.text = short_text_template.render(data=data) - self.raw_content = self.raw_content_to_html(self.raw_content) - self.content = wrap_text_into_html( - content_template.render(data=data), is_html=True - ) - if get_html_text_length(self.content) > SHORT_LIMIT: - self.message_type = MessageType.LONG - else: - self.message_type = MessageType.SHORT - - async def _get_douban_movie_review(self): - selector = await get_selector(url=self.url, headers=self.headers) - self.title = selector.xpath('string(//div[@id="content"]//h1//span)') - self.author = selector.xpath('string(//header[@class="main-hd"]//span)') - self.author_url = selector.xpath('string(//header[@class="main-hd"]/a/@href)') - self.item_title = selector.xpath('string(//header[@class="main-hd"]/a[2])') - self.item_url = selector.xpath('string(//header[@class="main-hd"]/a[2]/@href)') - self.raw_content = str( - etree.tostring( - selector.xpath("//div[contains(@class,'review-content')]")[0], - encoding="utf-8", - ), - encoding="utf-8", - ) - - async def _get_douban_book_review(self): - selector = await get_selector(self.url, headers=self.headers) - self.title = selector.xpath('string(//div[@id="content"]//h1//span)') - self.author = selector.xpath('string(//header[@class="main-hd"]//span)') - self.author_url = selector.xpath('string(//header[@class="main-hd"]/a/@href)') - self.item_title = selector.xpath('string(//header[@class="main-hd"]/a[2])') - self.item_url = selector.xpath('string(//header[@class="main-hd"]/a[2]/@href)') - self.raw_content = str( - etree.tostring( - selector.xpath('//div[@id="link-report"]')[0], encoding="utf-8" - ), - encoding="utf-8", - ) - - async def _get_douban_note(self): - selector = await get_selector(self.url, headers=self.headers) - self.title = selector.xpath("string(//h1)") - self.author = selector.xpath('string(//div[@class="content"]/a)') - self.author_url = selector.xpath('string(//div[@class="content"]/a/@href)') - self.raw_content = str( - etree.tostring( - selector.xpath('//div[@id="link-report"]')[0], encoding="utf-8" - ), - encoding="utf-8", - ) - - async def _get_douban_status(self): - selector = await get_selector(self.url, headers=self.headers) - self.author = selector.xpath('string(//div[@class="content"]/a)') - self.author_url = selector.xpath('string(//div[@class="content"]/a/@href)') - self.title = self.author + "的广播" - self.raw_content = ( - str( - etree.tostring( - selector.xpath('//div[@class="status-saying"]')[0], encoding="utf-8" - ), - encoding="utf-8", - ) - .replace("
", "") - .replace("
", "") - .replace(">+<", "><") - .replace(" ", "
") - ) - - async def _get_douban_group_article(self): - selector = await get_selector(self.url, headers=self.headers) - self.title = selector.xpath('string(//div[@id="content"]//h1)') - self.title = self.title.replace("\n", "").strip() - self.author = selector.xpath('string(//span[@class="from"]//a)') - self.author_url = selector.xpath('string(//span[@class="from"]//a/@href)') - self.group_name = selector.xpath( - 'string(//div[@id="g-side-info"]//div[@class="title"]/a)' - ) - self.group_url = selector.xpath( - 'string(//div[@id="g-side-info"]//div[@class="title"]/a/@href)' - ) - self.raw_content = str( - etree.tostring( - selector.xpath('//div[@id="link-report"]')[0], encoding="utf-8" - ), - encoding="utf-8", - ) - - def _douban_short_text_process(self) -> str: - soup = BeautifulSoup(self.raw_content, "html.parser") - for img in soup.find_all("img"): - media_item = {"media_type": "image", "url": img["src"], "caption": ""} - self.media_files.append(MediaFile.from_dict(media_item)) - img.extract() - for item in soup.find_all(["p", "span", "div"]): - item.unwrap() - for item in soup.find_all(["link", "script"]): - item.decompose() - for item in soup.find_all("a"): - if item.get("title") == "查看原图": - item.decompose() - short_text = str(soup) - short_text = re.sub(r"\n{2,}", "\n", short_text) - short_text = re.sub(r"", "\n", short_text) - return short_text - - @staticmethod - def raw_content_to_html(raw_content: str) -> str: - # Split the text into paragraphs based on double newlines - print(raw_content) - paragraphs = raw_content.split('
\n') - # Wrap each paragraph with

tags - print(paragraphs) - html_paragraphs = [f'

{paragraph.strip()}

' for paragraph in paragraphs] - # Join the paragraphs to form the final HTML string - html_string = ''.join(html_paragraphs) - return html_string diff --git a/app/services/scrapers/general/__init__.py b/app/services/scrapers/general/__init__.py deleted file mode 100644 index 94c0402..0000000 --- a/app/services/scrapers/general/__init__.py +++ /dev/null @@ -1,40 +0,0 @@ -from dataclasses import dataclass -from typing import Any - -from app.models.metadata_item import MetadataItem - - -@dataclass -class GeneralItem(MetadataItem): - """ - GeneralItem: Data class for scraped content from general webpage scrapers. - """ - id: str = "" - raw_content: str = "" - scraper_type: str = "" # Which scraper was used (e.g., "firecrawl", "zyte", etc.) - - @staticmethod - def from_dict(obj: Any) -> "GeneralItem": - metadata_item = MetadataItem.from_dict(obj) - return GeneralItem( - url=metadata_item.url, - title=metadata_item.title, - author=metadata_item.author, - author_url=metadata_item.author_url, - telegraph_url=metadata_item.telegraph_url, - text=metadata_item.text, - content=metadata_item.content, - media_files=metadata_item.media_files, - category=metadata_item.category, - message_type=metadata_item.message_type, - id=obj.get("id", ""), - raw_content=obj.get("raw_content", ""), - scraper_type=obj.get("scraper_type", ""), - ) - - def to_dict(self) -> dict: - result: dict = super().to_dict() - result["id"] = self.id - result["raw_content"] = self.raw_content - result["scraper_type"] = self.scraper_type - return result diff --git a/app/services/scrapers/general/base.py b/app/services/scrapers/general/base.py deleted file mode 100644 index 1ab9360..0000000 --- a/app/services/scrapers/general/base.py +++ /dev/null @@ -1,208 +0,0 @@ -import hashlib -from abc import abstractmethod -from typing import Optional -from urllib.parse import urlparse - -from bs4 import BeautifulSoup, Doctype -from openai import AsyncOpenAI -from openai.types.chat import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam - -from app.config import OPENAI_API_KEY -from app.models.metadata_item import MediaFile, MessageType -from app.services.scrapers.scraper import Scraper, DataProcessor -from app.services.scrapers.general import GeneralItem -from app.utils.parse import get_html_text_length, wrap_text_into_html -from app.utils.logger import logger - -GENERAL_TEXT_LIMIT = 800 - -DEFAULT_OPENAI_MODEL = "gpt-5-nano" - -# System prompt for LLM to extract article content -ARTICLE_EXTRACTION_PROMPT = """You are an expert content extractor. Your task is to extract the main article content from the provided HTML. - -Instructions: -1. Identify and extract ONLY the main article/post content -2. Remove navigation, headers, footers, sidebars, ads, comments, and other non-article elements -3. Preserve the article's structure (headings, paragraphs, lists, etc.) -4. Keep important formatting like bold, italic, links, and images -5. Return clean HTML containing only the article content -6. If you cannot identify the main content, return the original HTML unchanged -7. After all of the above, remove some basic HTML tags like , ,