Skip to content

fix: add bot id as queue identifier and fix reply to message#70

Merged
aturret merged 3 commits intomainfrom
apq-fix
Mar 23, 2026
Merged

fix: add bot id as queue identifier and fix reply to message#70
aturret merged 3 commits intomainfrom
apq-fix

Conversation

@aturret
Copy link
Owner

@aturret aturret commented Mar 23, 2026

Summary by CodeRabbit

  • New Features

    • Multi-bot support with per-bot queue routing and per-bot consumer startup.
    • Automatic job retries with configurable limits.
  • Improvements

    • Redis connection resilience with timeouts, retries and periodic health checks.
    • Message reply preservation: queued and delivered items now carry reply/message identifiers to maintain threading.
  • Tests

    • Expanded tests to cover per-bot routing and retry/health behaviors.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 23, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 29b63047-a2a4-45ef-8be3-b08b7fc257ac

📥 Commits

Reviewing files that changed from the base of the PR and between ee6e458 and dc00916.

📒 Files selected for processing (1)
  • apps/telegram-bot/core/handlers/url_process.py

📝 Walkthrough

Walkthrough

Introduce per-bot queue routing by threading bot_id through enqueue, worker, outbox, and consumer paths; add Redis connection resilience (timeouts/retries/health checks); and allow optional message_id threading for reply behavior when Telegram Message objects are absent.

Changes

Cohort / File(s) Summary
Redis config & worker settings
apps/async-worker/async_worker/main.py
Add conn_timeout=60, conn_retries=5, conn_retry_delay=1 to RedisSettings; enable retry_jobs=True, max_tries=3, and health_check_interval=30 in WorkerSettings.
Outbox service & scrape task
apps/async-worker/async_worker/services/outbox.py, apps/async-worker/async_worker/tasks/scrape.py
outbox.push(...) gains optional bot_id and routes to OUTBOX_QUEUE_KEY or OUTBOX_QUEUE_KEY:{bot_id}. scrape_and_enrich(...) accepts bot_id and forwards it to outbox.push on success/error.
Queue client init & enqueue
apps/telegram-bot/core/queue_client.py
init(bot_id: int) now stores module _bot_id; enqueue_scrape requires _arq_redis and _bot_id and includes bot_id=_bot_id in job payloads.
Bot app startup wiring
apps/telegram-bot/core/services/bot_app.py
When SCRAPE_MODE=="queue", pass application.bot.id into queue_client.init(...) and outbox_consumer.start(...); logs updated to include bot_id.
Outbox consumer & message sending
apps/telegram-bot/core/services/outbox_consumer.py, apps/telegram-bot/core/services/message_sender.py, apps/telegram-bot/core/handlers/url_process.py
outbox_consumer.start(bot_id) sets _outbox_key = f"{OUTBOX_QUEUE_KEY}:{bot_id}" and BRPOP from it; success path passes message_id=payload.get("message_id") to send_item_message. send_item_message accepts optional message_id and derives _reply_to from message or message_id. _fetch_and_send derives message_id from message when not provided.
Tests — async_worker
tests/unit/async_worker/conftest.py, tests/unit/async_worker/test_enrichment.py, tests/unit/async_worker/test_outbox.py, tests/unit/async_worker/test_scrape_task.py
Add celery_client MagicMock stub; adjust enrichment test inputs to avoid PDF fallback; update outbox and scrape task tests to assert bot_id present/absent behavior and queue key composition.
Tests — telegram bot
tests/unit/telegram_bot/test_queue_client.py, tests/unit/telegram_bot/test_outbox_consumer.py
Require init(bot_id=...) in tests, assert module _bot_id lifecycle, expand enqueue tests to check bot_id inclusion and error cases, and verify _outbox_key assignment and message_id passthrough in outbox consumer tests.

Sequence Diagram(s)

sequenceDiagram
    participant App as Bot App
    participant QC as queue_client
    participant Redis as Redis
    participant Task as worker task
    participant Outbox as Outbox service
    participant OC as outbox_consumer
    participant Sender as message_sender

    App->>QC: init(bot_id)
    QC->>Redis: create pool
    QC->>QC: store _bot_id

    App->>QC: enqueue_scrape(url, chat_id, message_id?)
    QC->>Redis: enqueue_job("scrape_and_enrich", bot_id=_bot_id, ...)
    
    Redis-->>Task: deliver job
    Task->>Outbox: push(job_id, chat_id, metadata..., message_id?, bot_id=...)
    Outbox->>Redis: LPUSH to f"outbox:{bot_id}" or base key

    OC->>OC: start(bot_id)
    OC->>Redis: BRPOP from f"outbox:{bot_id}"
    Redis-->>OC: payload
    OC->>Sender: send_item_message(data, chat_id, message_id=payload.message_id)
    Sender->>Sender: resolve reply_to from message or message_id
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 A rabbit hops with a tiny bot_id,
Queues split like paths where carrots hide.
Retries and heartbeats keep Redis awake,
Messages reply by the id that we make.
Hooray for routed hops — quick jobs on their ride! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the two main changes: adding bot_id as a queue identifier for multi-bot support and fixing the reply-to-message handling.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch apq-fix

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link

codecov-commenter commented Mar 23, 2026

Codecov Report

❌ Patch coverage is 80.76923% with 5 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
apps/telegram-bot/core/services/bot_app.py 0.00% 4 Missing ⚠️
apps/telegram-bot/core/services/message_sender.py 0.00% 1 Missing ⚠️

Impacted file tree graph

@@           Coverage Diff           @@
##             main      #70   +/-   ##
=======================================
  Coverage        ?   71.77%           
=======================================
  Files           ?       61           
  Lines           ?     3082           
  Branches        ?        0           
=======================================
  Hits            ?     2212           
  Misses          ?      870           
  Partials        ?        0           
Files with missing lines Coverage Δ
apps/async-worker/async_worker/main.py 100.00% <100.00%> (ø)
apps/async-worker/async_worker/services/outbox.py 100.00% <100.00%> (ø)
apps/async-worker/async_worker/tasks/scrape.py 100.00% <ø> (ø)
apps/telegram-bot/core/handlers/url_process.py 13.48% <ø> (ø)
apps/telegram-bot/core/queue_client.py 100.00% <100.00%> (ø)
apps/telegram-bot/core/services/outbox_consumer.py 95.52% <100.00%> (ø)
apps/telegram-bot/core/services/message_sender.py 15.66% <0.00%> (ø)
apps/telegram-bot/core/services/bot_app.py 23.65% <0.00%> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
apps/telegram-bot/core/services/message_sender.py (1)

55-56: Use explicit | None type hints per PEP 484.

Static analysis flags implicit Optional usage. Update the type hints to be explicit:

♻️ Proposed fix
 async def send_item_message(
-        data: dict, chat_id: Union[int, str] = None, message: Message = None,
-        message_id: int = None,
+        data: dict, chat_id: Union[int, str] | None = None, message: Message | None = None,
+        message_id: int | None = None,
 ) -> None:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/telegram-bot/core/services/message_sender.py` around lines 55 - 56,
Update the function signature in message_sender.py to use explicit
union-with-None type hints per PEP 484: change chat_id: Union[int, str] = None
to chat_id: Union[int, str] | None = None, change message: Message = None to
message: Message | None = None, and change message_id: int = None to message_id:
int | None = None so the parameters explicitly allow None; keep data: dict
unchanged unless it should also be optional.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/async-worker/async_worker/main.py`:
- Around line 50-52: The current config enables retry_jobs=True with max_tries=3
which can produce duplicate outbox entries because outbox.push() (in
services/outbox.py) does a plain lpush and the outbox consumer
(apps/telegram-bot/core/services/outbox_consumer.py) only logs job_id without
deduplication; fix by adding idempotency: modify outbox.push() to atomically
check-and-set a Redis SET (or Redis SETNX key per job_id) before performing
lpush so the same job_id is not pushed twice, or alternatively set
retry_jobs=False in main.py (disable retry_jobs / adjust max_tries) and
implement manual retries that use the same idempotency guard, or implement
deduplication in outbox_consumer.py by ignoring payloads whose job_id is already
recorded in a Redis set; update references to retry_jobs, max_tries,
outbox.push(), and the consumer's job_id handling accordingly.

In `@apps/telegram-bot/core/handlers/url_process.py`:
- Line 70: The current expression `message_id or message.message_id if message
else None` suffers from operator precedence and will yield None when `message`
is None even if `message_id` is provided; update it to explicitly prefer an
explicit `message_id` and fall back to `message.message_id` only when `message`
exists, e.g. replace with `message_id if message_id is not None else
(message.message_id if message else None)` so `message_id` is preserved when
set; adjust the assignment around the existing `message_id`/`message.message_id`
usage in url_process.py.

---

Nitpick comments:
In `@apps/telegram-bot/core/services/message_sender.py`:
- Around line 55-56: Update the function signature in message_sender.py to use
explicit union-with-None type hints per PEP 484: change chat_id: Union[int, str]
= None to chat_id: Union[int, str] | None = None, change message: Message = None
to message: Message | None = None, and change message_id: int = None to
message_id: int | None = None so the parameters explicitly allow None; keep
data: dict unchanged unless it should also be optional.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b0e8aad0-514f-4cb3-a1cd-1d4719e640d8

📥 Commits

Reviewing files that changed from the base of the PR and between b04bf59 and ee6e458.

📒 Files selected for processing (14)
  • apps/async-worker/async_worker/main.py
  • apps/async-worker/async_worker/services/outbox.py
  • apps/async-worker/async_worker/tasks/scrape.py
  • apps/telegram-bot/core/handlers/url_process.py
  • apps/telegram-bot/core/queue_client.py
  • apps/telegram-bot/core/services/bot_app.py
  • apps/telegram-bot/core/services/message_sender.py
  • apps/telegram-bot/core/services/outbox_consumer.py
  • tests/unit/async_worker/conftest.py
  • tests/unit/async_worker/test_enrichment.py
  • tests/unit/async_worker/test_outbox.py
  • tests/unit/async_worker/test_scrape_task.py
  • tests/unit/telegram_bot/test_outbox_consumer.py
  • tests/unit/telegram_bot/test_queue_client.py

Comment on lines +50 to +52
# Retry jobs on transient failures (e.g. Redis timeout after task completes)
retry_jobs = True
max_tries = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for deduplication or job_id tracking in outbox consumer
rg -n -C3 'job_id|dedupe|duplicate|idempoten' apps/telegram-bot/core/services/outbox_consumer.py

Repository: aturret/FastFetchBot

Length of output: 1128


🏁 Script executed:

#!/bin/bash
# 1. Read the outbox.push() function to confirm it uses simple lpush
echo "=== Checking outbox.push() implementation ===" 
sed -n '19,42p' apps/async-worker/async_worker/services/outbox.py

# 2. Check the full main.py context around lines 50-52
echo -e "\n=== Checking main.py retry settings context ===" 
sed -n '40,65p' apps/async-worker/async_worker/main.py

# 3. Search for any SET-based or job_id tracking in outbox or redis logic
echo -e "\n=== Searching for idempotency mechanisms ===" 
rg -n 'SET|setnx|set_if_not_exists|job_id.*track' apps/async-worker/async_worker/services/ -A2

Repository: aturret/FastFetchBot

Length of output: 1635


Duplicate outbox entries possible with retry_jobs=True.

Enabling retry_jobs=True with max_tries=3 can cause duplicate messages. The outbox.push() function (apps/async-worker/async_worker/services/outbox.py) performs a simple lpush without idempotency checks. If a task succeeds (pushes to outbox) but crashes before ARQ marks the job complete, the retry will push a duplicate entry that the consumer will process.

The outbox consumer (apps/telegram-bot/core/services/outbox_consumer.py) extracts the job_id from the payload but uses it only for logging, with no deduplication logic.

Fix by:

  1. Adding idempotency to outbox.push() using a Redis SET to track processed job IDs, or
  2. Using retry_jobs=False and implementing manual retry logic with idempotency guards, or
  3. Accepting duplicates and implementing deduplication in the consumer
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/async-worker/async_worker/main.py` around lines 50 - 52, The current
config enables retry_jobs=True with max_tries=3 which can produce duplicate
outbox entries because outbox.push() (in services/outbox.py) does a plain lpush
and the outbox consumer (apps/telegram-bot/core/services/outbox_consumer.py)
only logs job_id without deduplication; fix by adding idempotency: modify
outbox.push() to atomically check-and-set a Redis SET (or Redis SETNX key per
job_id) before performing lpush so the same job_id is not pushed twice, or
alternatively set retry_jobs=False in main.py (disable retry_jobs / adjust
max_tries) and implement manual retries that use the same idempotency guard, or
implement deduplication in outbox_consumer.py by ignoring payloads whose job_id
is already recorded in a Redis set; update references to retry_jobs, max_tries,
outbox.push(), and the consumer's job_id handling accordingly.

@aturret aturret merged commit fe687ec into main Mar 23, 2026
2 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants