Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions apps/async-worker/async_worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def parse_redis_url(url: str) -> RedisSettings:
port=parsed.port or 6379,
database=int(parsed.path.lstrip("/") or 0),
password=parsed.password,
conn_timeout=60,
conn_retries=5,
conn_retry_delay=1,
)


Expand All @@ -44,5 +47,12 @@ class WorkerSettings:
# Maximum concurrent jobs
max_jobs = 10

# Retry jobs on transient failures (e.g. Redis timeout after task completes)
retry_jobs = True
max_tries = 3
Comment on lines +50 to +52
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for deduplication or job_id tracking in outbox consumer
rg -n -C3 'job_id|dedupe|duplicate|idempoten' apps/telegram-bot/core/services/outbox_consumer.py

Repository: aturret/FastFetchBot

Length of output: 1128


🏁 Script executed:

#!/bin/bash
# 1. Read the outbox.push() function to confirm it uses simple lpush
echo "=== Checking outbox.push() implementation ===" 
sed -n '19,42p' apps/async-worker/async_worker/services/outbox.py

# 2. Check the full main.py context around lines 50-52
echo -e "\n=== Checking main.py retry settings context ===" 
sed -n '40,65p' apps/async-worker/async_worker/main.py

# 3. Search for any SET-based or job_id tracking in outbox or redis logic
echo -e "\n=== Searching for idempotency mechanisms ===" 
rg -n 'SET|setnx|set_if_not_exists|job_id.*track' apps/async-worker/async_worker/services/ -A2

Repository: aturret/FastFetchBot

Length of output: 1635


Duplicate outbox entries possible with retry_jobs=True.

Enabling retry_jobs=True with max_tries=3 can cause duplicate messages. The outbox.push() function (apps/async-worker/async_worker/services/outbox.py) performs a simple lpush without idempotency checks. If a task succeeds (pushes to outbox) but crashes before ARQ marks the job complete, the retry will push a duplicate entry that the consumer will process.

The outbox consumer (apps/telegram-bot/core/services/outbox_consumer.py) extracts the job_id from the payload but uses it only for logging, with no deduplication logic.

Fix by:

  1. Adding idempotency to outbox.push() using a Redis SET to track processed job IDs, or
  2. Using retry_jobs=False and implementing manual retry logic with idempotency guards, or
  3. Accepting duplicates and implementing deduplication in the consumer
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/async-worker/async_worker/main.py` around lines 50 - 52, The current
config enables retry_jobs=True with max_tries=3 which can produce duplicate
outbox entries because outbox.push() (in services/outbox.py) does a plain lpush
and the outbox consumer (apps/telegram-bot/core/services/outbox_consumer.py)
only logs job_id without deduplication; fix by adding idempotency: modify
outbox.push() to atomically check-and-set a Redis SET (or Redis SETNX key per
job_id) before performing lpush so the same job_id is not pushed twice, or
alternatively set retry_jobs=False in main.py (disable retry_jobs / adjust
max_tries) and implement manual retries that use the same idempotency guard, or
implement deduplication in outbox_consumer.py by ignoring payloads whose job_id
is already recorded in a Redis set; update references to retry_jobs, max_tries,
outbox.push(), and the consumer's job_id handling accordingly.


# Keep results for 1 hour
keep_result = 3600

# Health-check the Redis connection every 30s to prevent stale connections
health_check_interval = 30
12 changes: 9 additions & 3 deletions apps/async-worker/async_worker/services/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@ async def push(
metadata_item: dict | None = None,
message_id: int | None = None,
error: str | None = None,
bot_id: int | str | None = None,
) -> None:
"""Push a result payload to the Redis outbox queue."""
"""Push a result payload to the per-bot Redis outbox queue.

The queue key is ``{OUTBOX_QUEUE_KEY}:{bot_id}`` when *bot_id* is provided,
falling back to the plain ``OUTBOX_QUEUE_KEY`` for backward compatibility.
"""
r = await get_outbox_redis()
queue_key = f"{OUTBOX_QUEUE_KEY}:{bot_id}" if bot_id is not None else OUTBOX_QUEUE_KEY
payload = {
"job_id": job_id,
"chat_id": chat_id,
"message_id": message_id,
"metadata_item": metadata_item,
"error": error,
}
await r.lpush(OUTBOX_QUEUE_KEY, json.dumps(payload, ensure_ascii=False))
logger.info(f"Pushed result to outbox: job_id={job_id}, error={error is not None}")
await r.lpush(queue_key, json.dumps(payload, ensure_ascii=False))
logger.info(f"Pushed result to outbox: job_id={job_id}, queue={queue_key}, error={error is not None}")


async def close() -> None:
Expand Down
7 changes: 6 additions & 1 deletion apps/async-worker/async_worker/tasks/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async def scrape_and_enrich(
message_id: int | None = None,
source: str = "",
content_type: str = "",
bot_id: int | str | None = None,
store_telegraph: bool | None = None,
store_document: bool | None = None,
**kwargs,
Expand All @@ -31,6 +32,8 @@ async def scrape_and_enrich(
message_id: Optional Telegram message ID for reply threading.
source: URL source platform (e.g. "twitter", "weibo").
content_type: Content type (e.g. "social_media", "video").
bot_id: Telegram bot user ID. Used to route results to the correct
bot's outbox queue (``scrape:outbox:{bot_id}``).
store_telegraph: Override Telegraph publishing flag.
store_document: Override PDF export flag.
**kwargs: Extra arguments passed to the scraper.
Expand Down Expand Up @@ -64,12 +67,13 @@ async def scrape_and_enrich(

logger.info(f"[{job_id}] Scrape completed successfully")

# Push to outbox
# Push to outbox (per-bot queue key)
await outbox.push(
job_id=job_id,
chat_id=chat_id,
message_id=message_id,
metadata_item=metadata_item,
bot_id=bot_id,
)

return {"job_id": job_id, "status": "success"}
Expand All @@ -84,6 +88,7 @@ async def scrape_and_enrich(
chat_id=chat_id,
message_id=message_id,
error=str(e),
bot_id=bot_id,
)

return {"job_id": job_id, "status": "error", "error": str(e)}
2 changes: 1 addition & 1 deletion apps/telegram-bot/core/handlers/url_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def _fetch_and_send(
await queue_client.enqueue_scrape(
url=url,
chat_id=chat_id,
message_id=message_id,
message_id=message_id if message_id is not None else (message.message_id if message else None),
source=source,
content_type=content_type,
**kwargs,
Expand Down
20 changes: 14 additions & 6 deletions apps/telegram-bot/core/queue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastfetchbot_shared.utils.logger import logger

_arq_redis: ArqRedis | None = None
_bot_id: int | None = None


def _parse_redis_url(url: str) -> RedisSettings:
Expand All @@ -21,20 +22,26 @@ def _parse_redis_url(url: str) -> RedisSettings:
)


async def init() -> None:
"""Initialize the ARQ Redis connection pool."""
global _arq_redis
async def init(bot_id: int) -> None:
"""Initialize the ARQ Redis connection pool.

Args:
bot_id: Telegram bot user ID (from application.bot.id).
"""
global _arq_redis, _bot_id
if _arq_redis is None:
_bot_id = bot_id
_arq_redis = await create_pool(_parse_redis_url(ARQ_REDIS_URL))
logger.info("ARQ queue client initialized")
logger.info(f"ARQ queue client initialized for bot_id={bot_id}")


async def close() -> None:
"""Close the ARQ Redis connection pool."""
global _arq_redis
global _arq_redis, _bot_id
if _arq_redis is not None:
await _arq_redis.aclose()
_arq_redis = None
_bot_id = None
logger.info("ARQ queue client closed")


Expand All @@ -50,7 +57,7 @@ async def enqueue_scrape(

Returns the job_id (UUID string).
"""
if _arq_redis is None:
if _arq_redis is None or _bot_id is None:
raise RuntimeError("Queue client not initialized. Call queue_client.init() first.")

job_id = str(uuid.uuid4())
Expand All @@ -62,6 +69,7 @@ async def enqueue_scrape(
message_id=message_id,
source=source,
content_type=content_type,
bot_id=_bot_id,
**kwargs,
)
logger.info(f"Enqueued scrape job: job_id={job_id}, url={url}")
Expand Down
7 changes: 4 additions & 3 deletions apps/telegram-bot/core/services/bot_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ async def startup() -> None:
from core import queue_client
from core.services import outbox_consumer

await queue_client.init()
await outbox_consumer.start()
logger.info("Queue mode enabled: ARQ client and outbox consumer started")
bot_id = application.bot.id
await queue_client.init(bot_id=bot_id)
await outbox_consumer.start(bot_id=bot_id)
logger.info(f"Queue mode enabled: ARQ client and outbox consumer started (bot_id={bot_id})")

if application.post_init:
await application.post_init()
Expand Down
11 changes: 7 additions & 4 deletions apps/telegram-bot/core/services/message_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ def _get_application():


async def send_item_message(
data: dict, chat_id: Union[int, str] = None, message: Message = None
data: dict, chat_id: Union[int, str] = None, message: Message = None,
message_id: int = None,
) -> None:
"""
:param data: (dict) metadata of the item
:param chat_id: (int) any chat id for sending
:param message: (Message) any message to reply
:param message_id: (int) bare message ID for reply threading (used when Message object is unavailable, e.g. outbox consumer)
:return:
"""
application = _get_application()
logger.debug(f"send_item_message: {data}, {chat_id}, {message}")
_reply_to = message.message_id if message else message_id
if not chat_id and not message:
raise ValueError("must provide chat_id or message")
if (
Expand Down Expand Up @@ -100,7 +103,7 @@ async def send_item_message(
parse_mode=ParseMode.HTML,
caption=caption_text,
write_timeout=TELEBOT_WRITE_TIMEOUT,
reply_to_message_id=message.message_id if message else None,
reply_to_message_id=_reply_to,
)
if sent_media_files_message is tuple:
reply_to_message_id = sent_media_files_message[0].message_id
Expand All @@ -112,7 +115,7 @@ async def send_item_message(
chat_id=chat_id,
text=caption_text,
parse_mode=ParseMode.HTML,
reply_to_message_id=message.message_id if message else None,
reply_to_message_id=_reply_to,
disable_web_page_preview=True
if data["message_type"] == MessageType.SHORT
else False,
Expand Down Expand Up @@ -161,7 +164,7 @@ async def send_item_message(
chat_id=chat_id,
text=caption_text,
parse_mode=ParseMode.HTML,
reply_to_message_id=message.message_id if message else None,
reply_to_message_id=_reply_to,
disable_web_page_preview=True
if data["message_type"] == "short"
else False,
Expand Down
30 changes: 21 additions & 9 deletions apps/telegram-bot/core/services/outbox_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

_redis: aioredis.Redis | None = None
_consumer_task: asyncio.Task | None = None
_outbox_key: str | None = None


async def _get_redis() -> aioredis.Redis:
Expand All @@ -20,14 +21,15 @@ async def _get_redis() -> aioredis.Redis:


async def _consume_loop() -> None:
"""Background loop: BRPOP from the outbox queue and dispatch results."""
"""Background loop: BRPOP from the per-bot outbox queue and dispatch results."""
r = await _get_redis()
logger.info(f"Outbox consumer started, listening on '{OUTBOX_QUEUE_KEY}'")
key = _outbox_key or OUTBOX_QUEUE_KEY
logger.info(f"Outbox consumer started, listening on '{key}'")

while True:
try:
# BRPOP blocks until a message is available (timeout=0 means block forever)
result = await r.brpop(OUTBOX_QUEUE_KEY, timeout=0)
result = await r.brpop(key, timeout=0)
if result is None:
continue

Expand All @@ -45,7 +47,10 @@ async def _consume_loop() -> None:
metadata_item = payload.get("metadata_item")
if metadata_item and chat_id:
logger.info(f"[{job_id}] Delivering result to chat {chat_id}")
await send_item_message(metadata_item, chat_id=chat_id)
await send_item_message(
metadata_item, chat_id=chat_id,
message_id=payload.get("message_id"),
)
else:
logger.warning(f"[{job_id}] Invalid payload: missing metadata_item or chat_id")

Expand All @@ -71,19 +76,25 @@ async def _send_error_to_chat(chat_id: int | str, error: str) -> None:
logger.error(f"Failed to send error message to chat {chat_id}: {e}")


async def start() -> None:
"""Start the outbox consumer as a background asyncio task."""
global _consumer_task
async def start(bot_id: int) -> None:
"""Start the outbox consumer as a background asyncio task.

Args:
bot_id: Telegram bot user ID. Used to build the per-bot outbox key
so each bot only consumes its own results.
"""
global _consumer_task, _outbox_key
if _consumer_task is not None:
logger.warning("Outbox consumer already running")
return
_outbox_key = f"{OUTBOX_QUEUE_KEY}:{bot_id}"
_consumer_task = asyncio.create_task(_consume_loop())
logger.info("Outbox consumer task created")
logger.info(f"Outbox consumer task created for bot_id={bot_id}")


async def stop() -> None:
"""Stop the outbox consumer and close the Redis connection."""
global _consumer_task, _redis
global _consumer_task, _redis, _outbox_key

if _consumer_task is not None:
_consumer_task.cancel()
Expand All @@ -97,3 +108,4 @@ async def stop() -> None:
if _redis is not None:
await _redis.aclose()
_redis = None
_outbox_key = None
7 changes: 7 additions & 0 deletions tests/unit/async_worker/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import sys
from pathlib import Path
from unittest.mock import MagicMock

# Add the async-worker app directory to sys.path so 'async_worker' is importable
_app_dir = Path(__file__).resolve().parents[3] / "apps" / "async-worker"
if str(_app_dir) not in sys.path:
sys.path.insert(0, str(_app_dir))

# Pre-mock async_worker.celery_client to avoid Celery trying to connect to Redis
# during test collection and lazy imports in enrichment.py
_mock_celery_module = MagicMock()
_mock_celery_module.celery_app = MagicMock()
sys.modules.setdefault("async_worker.celery_client", _mock_celery_module)
5 changes: 5 additions & 0 deletions tests/unit/async_worker/test_enrichment.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ async def test_publishes_to_telegraph_when_enabled(
@pytest.mark.asyncio
async def test_skips_telegraph_when_disabled(self, base_metadata_item, mock_telegraph):
MockTg, instance = mock_telegraph
# Set a non-empty telegraph_url to avoid triggering the PDF fallback path
# (which tries to import celery_client and connect to Redis)
base_metadata_item["telegraph_url"] = "https://existing.url"
result = await enrich(
base_metadata_item, store_telegraph=False, store_document=False
)
Expand Down Expand Up @@ -173,6 +176,8 @@ class TestTitleStripping:
@pytest.mark.asyncio
async def test_strips_title_whitespace(self, base_metadata_item):
base_metadata_item["title"] = " padded title "
# Set a non-empty telegraph_url to avoid triggering the PDF fallback path
base_metadata_item["telegraph_url"] = "https://existing.url"
result = await enrich(
base_metadata_item, store_telegraph=False, store_document=False
)
Expand Down
24 changes: 23 additions & 1 deletion tests/unit/async_worker/test_outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def test_returns_same_instance_on_second_call(self, mock_redis):

class TestPush:
@pytest.mark.asyncio
async def test_push_metadata_item(self, mock_redis):
async def test_push_metadata_item_with_bot_id(self, mock_redis):
with patch(
"async_worker.services.outbox.aioredis.from_url",
return_value=mock_redis,
Expand All @@ -81,19 +81,40 @@ async def test_push_metadata_item(self, mock_redis):
chat_id=12345,
metadata_item={"title": "Test", "content": "hi"},
message_id=99,
bot_id=123,
)

mock_redis.lpush.assert_awaited_once()
args = mock_redis.lpush.call_args
queue_key = args[0][0]
payload = json.loads(args[0][1])

assert "123" in queue_key # per-bot queue key
assert payload["job_id"] == "j1"
assert payload["chat_id"] == 12345
assert payload["message_id"] == 99
assert payload["metadata_item"] == {"title": "Test", "content": "hi"}
assert payload["error"] is None

@pytest.mark.asyncio
async def test_push_without_bot_id_uses_base_key(self, mock_redis):
with patch(
"async_worker.services.outbox.aioredis.from_url",
return_value=mock_redis,
):
from async_worker.services.outbox import push

await push(
job_id="j0",
chat_id=1,
metadata_item={"title": "Test"},
)

args = mock_redis.lpush.call_args
queue_key = args[0][0]
# Without bot_id, should use the plain OUTBOX_QUEUE_KEY
assert ":" not in queue_key.split("outbox")[-1]

@pytest.mark.asyncio
async def test_push_error(self, mock_redis):
with patch(
Expand All @@ -106,6 +127,7 @@ async def test_push_error(self, mock_redis):
job_id="j2",
chat_id=42,
error="something broke",
bot_id=456,
)

payload = json.loads(mock_redis.lpush.call_args[0][1])
Expand Down
Loading
Loading