From 14d38bf219743a2e073b86cb974ba8ef5bbc6aa5 Mon Sep 17 00:00:00 2001 From: aturret Date: Sun, 22 Mar 2026 19:19:47 -0500 Subject: [PATCH 1/3] feat: add bot id as queue identifier --- apps/async-worker/async_worker/main.py | 10 +++++++ .../async_worker/services/outbox.py | 12 ++++++-- .../async-worker/async_worker/tasks/scrape.py | 7 ++++- apps/telegram-bot/core/queue_client.py | 20 +++++++++---- apps/telegram-bot/core/services/bot_app.py | 7 +++-- .../core/services/outbox_consumer.py | 25 +++++++++++----- tests/unit/async_worker/conftest.py | 7 +++++ tests/unit/async_worker/test_enrichment.py | 5 ++++ tests/unit/async_worker/test_outbox.py | 24 ++++++++++++++- tests/unit/async_worker/test_scrape_task.py | 20 ++++++++++++- .../unit/telegram_bot/test_outbox_consumer.py | 8 +++-- tests/unit/telegram_bot/test_queue_client.py | 29 +++++++++++++++++-- 12 files changed, 146 insertions(+), 28 deletions(-) diff --git a/apps/async-worker/async_worker/main.py b/apps/async-worker/async_worker/main.py index cbae148..88a76ad 100644 --- a/apps/async-worker/async_worker/main.py +++ b/apps/async-worker/async_worker/main.py @@ -29,6 +29,9 @@ def parse_redis_url(url: str) -> RedisSettings: port=parsed.port or 6379, database=int(parsed.path.lstrip("/") or 0), password=parsed.password, + conn_timeout=60, + conn_retries=5, + conn_retry_delay=1, ) @@ -44,5 +47,12 @@ class WorkerSettings: # Maximum concurrent jobs max_jobs = 10 + # Retry jobs on transient failures (e.g. Redis timeout after task completes) + retry_jobs = True + max_tries = 3 + # Keep results for 1 hour keep_result = 3600 + + # Health-check the Redis connection every 30s to prevent stale connections + health_check_interval = 30 diff --git a/apps/async-worker/async_worker/services/outbox.py b/apps/async-worker/async_worker/services/outbox.py index bd25345..40ec9a8 100644 --- a/apps/async-worker/async_worker/services/outbox.py +++ b/apps/async-worker/async_worker/services/outbox.py @@ -22,9 +22,15 @@ async def push( metadata_item: dict | None = None, message_id: int | None = None, error: str | None = None, + bot_id: int | str | None = None, ) -> None: - """Push a result payload to the Redis outbox queue.""" + """Push a result payload to the per-bot Redis outbox queue. + + The queue key is ``{OUTBOX_QUEUE_KEY}:{bot_id}`` when *bot_id* is provided, + falling back to the plain ``OUTBOX_QUEUE_KEY`` for backward compatibility. + """ r = await get_outbox_redis() + queue_key = f"{OUTBOX_QUEUE_KEY}:{bot_id}" if bot_id is not None else OUTBOX_QUEUE_KEY payload = { "job_id": job_id, "chat_id": chat_id, @@ -32,8 +38,8 @@ async def push( "metadata_item": metadata_item, "error": error, } - await r.lpush(OUTBOX_QUEUE_KEY, json.dumps(payload, ensure_ascii=False)) - logger.info(f"Pushed result to outbox: job_id={job_id}, error={error is not None}") + await r.lpush(queue_key, json.dumps(payload, ensure_ascii=False)) + logger.info(f"Pushed result to outbox: job_id={job_id}, queue={queue_key}, error={error is not None}") async def close() -> None: diff --git a/apps/async-worker/async_worker/tasks/scrape.py b/apps/async-worker/async_worker/tasks/scrape.py index 9282434..8f62c36 100644 --- a/apps/async-worker/async_worker/tasks/scrape.py +++ b/apps/async-worker/async_worker/tasks/scrape.py @@ -17,6 +17,7 @@ async def scrape_and_enrich( message_id: int | None = None, source: str = "", content_type: str = "", + bot_id: int | str | None = None, store_telegraph: bool | None = None, store_document: bool | None = None, **kwargs, @@ -31,6 +32,8 @@ async def scrape_and_enrich( message_id: Optional Telegram message ID for reply threading. source: URL source platform (e.g. "twitter", "weibo"). content_type: Content type (e.g. "social_media", "video"). + bot_id: Telegram bot user ID. Used to route results to the correct + bot's outbox queue (``scrape:outbox:{bot_id}``). store_telegraph: Override Telegraph publishing flag. store_document: Override PDF export flag. **kwargs: Extra arguments passed to the scraper. @@ -64,12 +67,13 @@ async def scrape_and_enrich( logger.info(f"[{job_id}] Scrape completed successfully") - # Push to outbox + # Push to outbox (per-bot queue key) await outbox.push( job_id=job_id, chat_id=chat_id, message_id=message_id, metadata_item=metadata_item, + bot_id=bot_id, ) return {"job_id": job_id, "status": "success"} @@ -84,6 +88,7 @@ async def scrape_and_enrich( chat_id=chat_id, message_id=message_id, error=str(e), + bot_id=bot_id, ) return {"job_id": job_id, "status": "error", "error": str(e)} diff --git a/apps/telegram-bot/core/queue_client.py b/apps/telegram-bot/core/queue_client.py index ee15209..8a31039 100644 --- a/apps/telegram-bot/core/queue_client.py +++ b/apps/telegram-bot/core/queue_client.py @@ -6,6 +6,7 @@ from fastfetchbot_shared.utils.logger import logger _arq_redis: ArqRedis | None = None +_bot_id: int | None = None def _parse_redis_url(url: str) -> RedisSettings: @@ -21,20 +22,26 @@ def _parse_redis_url(url: str) -> RedisSettings: ) -async def init() -> None: - """Initialize the ARQ Redis connection pool.""" - global _arq_redis +async def init(bot_id: int) -> None: + """Initialize the ARQ Redis connection pool. + + Args: + bot_id: Telegram bot user ID (from application.bot.id). + """ + global _arq_redis, _bot_id if _arq_redis is None: + _bot_id = bot_id _arq_redis = await create_pool(_parse_redis_url(ARQ_REDIS_URL)) - logger.info("ARQ queue client initialized") + logger.info(f"ARQ queue client initialized for bot_id={bot_id}") async def close() -> None: """Close the ARQ Redis connection pool.""" - global _arq_redis + global _arq_redis, _bot_id if _arq_redis is not None: await _arq_redis.aclose() _arq_redis = None + _bot_id = None logger.info("ARQ queue client closed") @@ -50,7 +57,7 @@ async def enqueue_scrape( Returns the job_id (UUID string). """ - if _arq_redis is None: + if _arq_redis is None or _bot_id is None: raise RuntimeError("Queue client not initialized. Call queue_client.init() first.") job_id = str(uuid.uuid4()) @@ -62,6 +69,7 @@ async def enqueue_scrape( message_id=message_id, source=source, content_type=content_type, + bot_id=_bot_id, **kwargs, ) logger.info(f"Enqueued scrape job: job_id={job_id}, url={url}") diff --git a/apps/telegram-bot/core/services/bot_app.py b/apps/telegram-bot/core/services/bot_app.py index a2be308..5de6ea4 100644 --- a/apps/telegram-bot/core/services/bot_app.py +++ b/apps/telegram-bot/core/services/bot_app.py @@ -136,9 +136,10 @@ async def startup() -> None: from core import queue_client from core.services import outbox_consumer - await queue_client.init() - await outbox_consumer.start() - logger.info("Queue mode enabled: ARQ client and outbox consumer started") + bot_id = application.bot.id + await queue_client.init(bot_id=bot_id) + await outbox_consumer.start(bot_id=bot_id) + logger.info(f"Queue mode enabled: ARQ client and outbox consumer started (bot_id={bot_id})") if application.post_init: await application.post_init() diff --git a/apps/telegram-bot/core/services/outbox_consumer.py b/apps/telegram-bot/core/services/outbox_consumer.py index ec203ca..882cf96 100644 --- a/apps/telegram-bot/core/services/outbox_consumer.py +++ b/apps/telegram-bot/core/services/outbox_consumer.py @@ -9,6 +9,7 @@ _redis: aioredis.Redis | None = None _consumer_task: asyncio.Task | None = None +_outbox_key: str | None = None async def _get_redis() -> aioredis.Redis: @@ -20,14 +21,15 @@ async def _get_redis() -> aioredis.Redis: async def _consume_loop() -> None: - """Background loop: BRPOP from the outbox queue and dispatch results.""" + """Background loop: BRPOP from the per-bot outbox queue and dispatch results.""" r = await _get_redis() - logger.info(f"Outbox consumer started, listening on '{OUTBOX_QUEUE_KEY}'") + key = _outbox_key or OUTBOX_QUEUE_KEY + logger.info(f"Outbox consumer started, listening on '{key}'") while True: try: # BRPOP blocks until a message is available (timeout=0 means block forever) - result = await r.brpop(OUTBOX_QUEUE_KEY, timeout=0) + result = await r.brpop(key, timeout=0) if result is None: continue @@ -71,19 +73,25 @@ async def _send_error_to_chat(chat_id: int | str, error: str) -> None: logger.error(f"Failed to send error message to chat {chat_id}: {e}") -async def start() -> None: - """Start the outbox consumer as a background asyncio task.""" - global _consumer_task +async def start(bot_id: int) -> None: + """Start the outbox consumer as a background asyncio task. + + Args: + bot_id: Telegram bot user ID. Used to build the per-bot outbox key + so each bot only consumes its own results. + """ + global _consumer_task, _outbox_key if _consumer_task is not None: logger.warning("Outbox consumer already running") return + _outbox_key = f"{OUTBOX_QUEUE_KEY}:{bot_id}" _consumer_task = asyncio.create_task(_consume_loop()) - logger.info("Outbox consumer task created") + logger.info(f"Outbox consumer task created for bot_id={bot_id}") async def stop() -> None: """Stop the outbox consumer and close the Redis connection.""" - global _consumer_task, _redis + global _consumer_task, _redis, _outbox_key if _consumer_task is not None: _consumer_task.cancel() @@ -97,3 +105,4 @@ async def stop() -> None: if _redis is not None: await _redis.aclose() _redis = None + _outbox_key = None diff --git a/tests/unit/async_worker/conftest.py b/tests/unit/async_worker/conftest.py index b8206ce..4ac47ea 100644 --- a/tests/unit/async_worker/conftest.py +++ b/tests/unit/async_worker/conftest.py @@ -1,7 +1,14 @@ import sys from pathlib import Path +from unittest.mock import MagicMock # Add the async-worker app directory to sys.path so 'async_worker' is importable _app_dir = Path(__file__).resolve().parents[3] / "apps" / "async-worker" if str(_app_dir) not in sys.path: sys.path.insert(0, str(_app_dir)) + +# Pre-mock async_worker.celery_client to avoid Celery trying to connect to Redis +# during test collection and lazy imports in enrichment.py +_mock_celery_module = MagicMock() +_mock_celery_module.celery_app = MagicMock() +sys.modules.setdefault("async_worker.celery_client", _mock_celery_module) diff --git a/tests/unit/async_worker/test_enrichment.py b/tests/unit/async_worker/test_enrichment.py index dc7caa9..376f9c0 100644 --- a/tests/unit/async_worker/test_enrichment.py +++ b/tests/unit/async_worker/test_enrichment.py @@ -64,6 +64,9 @@ async def test_publishes_to_telegraph_when_enabled( @pytest.mark.asyncio async def test_skips_telegraph_when_disabled(self, base_metadata_item, mock_telegraph): MockTg, instance = mock_telegraph + # Set a non-empty telegraph_url to avoid triggering the PDF fallback path + # (which tries to import celery_client and connect to Redis) + base_metadata_item["telegraph_url"] = "https://existing.url" result = await enrich( base_metadata_item, store_telegraph=False, store_document=False ) @@ -173,6 +176,8 @@ class TestTitleStripping: @pytest.mark.asyncio async def test_strips_title_whitespace(self, base_metadata_item): base_metadata_item["title"] = " padded title " + # Set a non-empty telegraph_url to avoid triggering the PDF fallback path + base_metadata_item["telegraph_url"] = "https://existing.url" result = await enrich( base_metadata_item, store_telegraph=False, store_document=False ) diff --git a/tests/unit/async_worker/test_outbox.py b/tests/unit/async_worker/test_outbox.py index 37f29e6..88f7ccf 100644 --- a/tests/unit/async_worker/test_outbox.py +++ b/tests/unit/async_worker/test_outbox.py @@ -69,7 +69,7 @@ async def test_returns_same_instance_on_second_call(self, mock_redis): class TestPush: @pytest.mark.asyncio - async def test_push_metadata_item(self, mock_redis): + async def test_push_metadata_item_with_bot_id(self, mock_redis): with patch( "async_worker.services.outbox.aioredis.from_url", return_value=mock_redis, @@ -81,6 +81,7 @@ async def test_push_metadata_item(self, mock_redis): chat_id=12345, metadata_item={"title": "Test", "content": "hi"}, message_id=99, + bot_id=123, ) mock_redis.lpush.assert_awaited_once() @@ -88,12 +89,32 @@ async def test_push_metadata_item(self, mock_redis): queue_key = args[0][0] payload = json.loads(args[0][1]) + assert "123" in queue_key # per-bot queue key assert payload["job_id"] == "j1" assert payload["chat_id"] == 12345 assert payload["message_id"] == 99 assert payload["metadata_item"] == {"title": "Test", "content": "hi"} assert payload["error"] is None + @pytest.mark.asyncio + async def test_push_without_bot_id_uses_base_key(self, mock_redis): + with patch( + "async_worker.services.outbox.aioredis.from_url", + return_value=mock_redis, + ): + from async_worker.services.outbox import push + + await push( + job_id="j0", + chat_id=1, + metadata_item={"title": "Test"}, + ) + + args = mock_redis.lpush.call_args + queue_key = args[0][0] + # Without bot_id, should use the plain OUTBOX_QUEUE_KEY + assert ":" not in queue_key.split("outbox")[-1] + @pytest.mark.asyncio async def test_push_error(self, mock_redis): with patch( @@ -106,6 +127,7 @@ async def test_push_error(self, mock_redis): job_id="j2", chat_id=42, error="something broke", + bot_id=456, ) payload = json.loads(mock_redis.lpush.call_args[0][1]) diff --git a/tests/unit/async_worker/test_scrape_task.py b/tests/unit/async_worker/test_scrape_task.py index 64b5f8b..2b5d90e 100644 --- a/tests/unit/async_worker/test_scrape_task.py +++ b/tests/unit/async_worker/test_scrape_task.py @@ -152,6 +152,7 @@ async def test_pushes_result_to_outbox( chat_id=42, job_id="j1", message_id=99, + bot_id=123, ) mock_outbox.push.assert_awaited_once() call_kwargs = mock_outbox.push.call_args.kwargs @@ -159,6 +160,22 @@ async def test_pushes_result_to_outbox( assert call_kwargs["chat_id"] == 42 assert call_kwargs["message_id"] == 99 assert call_kwargs["metadata_item"] is not None + assert call_kwargs["bot_id"] == 123 + + @pytest.mark.asyncio + async def test_pushes_result_without_bot_id( + self, ctx, mock_info_extract, mock_enrichment, mock_outbox + ): + """When bot_id is not provided, it should still be passed as None.""" + await scrape_and_enrich( + ctx, + url="u", + chat_id=42, + job_id="j1", + ) + mock_outbox.push.assert_awaited_once() + call_kwargs = mock_outbox.push.call_args.kwargs + assert call_kwargs["bot_id"] is None # --------------------------------------------------------------------------- @@ -177,13 +194,14 @@ async def test_scraping_failure_pushes_error( MockCls.return_value = instance result = await scrape_and_enrich( - ctx, url="u", chat_id=1, job_id="j-err" + ctx, url="u", chat_id=1, job_id="j-err", bot_id=789 ) assert result["status"] == "error" assert "scrape boom" in result["error"] mock_outbox.push.assert_awaited_once() assert mock_outbox.push.call_args.kwargs["error"] == "scrape boom" + assert mock_outbox.push.call_args.kwargs["bot_id"] == 789 @pytest.mark.asyncio async def test_enrichment_failure_pushes_error( diff --git a/tests/unit/telegram_bot/test_outbox_consumer.py b/tests/unit/telegram_bot/test_outbox_consumer.py index b853d19..8de2e12 100644 --- a/tests/unit/telegram_bot/test_outbox_consumer.py +++ b/tests/unit/telegram_bot/test_outbox_consumer.py @@ -19,9 +19,11 @@ def reset_module_state(): oc._redis = None oc._consumer_task = None + oc._outbox_key = None yield oc._redis = None oc._consumer_task = None + oc._outbox_key = None @pytest.fixture @@ -275,8 +277,9 @@ async def test_start_creates_task(self, mock_redis): "_consume_loop", new_callable=AsyncMock, ): - await oc.start() + await oc.start(bot_id=123) assert oc._consumer_task is not None + assert oc._outbox_key == "scrape:outbox:123" # Clean up await oc.stop() @@ -289,7 +292,7 @@ async def test_start_idempotent(self, mock_redis): fake_task = MagicMock() oc._consumer_task = fake_task - await oc.start() + await oc.start(bot_id=456) # Should still be the original fake task assert oc._consumer_task is fake_task @@ -317,6 +320,7 @@ async def _noop(): mock_redis.aclose.assert_awaited_once() assert oc._consumer_task is None assert oc._redis is None + assert oc._outbox_key is None @pytest.mark.asyncio async def test_stop_when_not_running(self): diff --git a/tests/unit/telegram_bot/test_queue_client.py b/tests/unit/telegram_bot/test_queue_client.py index 37e9872..afff2fd 100644 --- a/tests/unit/telegram_bot/test_queue_client.py +++ b/tests/unit/telegram_bot/test_queue_client.py @@ -17,8 +17,10 @@ def reset_module_state(): import core.queue_client as qc qc._arq_redis = None + qc._bot_id = None yield qc._arq_redis = None + qc._bot_id = None @pytest.fixture @@ -75,8 +77,9 @@ async def test_init_creates_pool(self, mock_arq_pool): ): import core.queue_client as qc - await qc.init() + await qc.init(bot_id=123) assert qc._arq_redis is mock_arq_pool + assert qc._bot_id == 123 @pytest.mark.asyncio async def test_init_idempotent(self, mock_arq_pool): @@ -87,9 +90,11 @@ async def test_init_idempotent(self, mock_arq_pool): ) as mock_create: import core.queue_client as qc - await qc.init() - await qc.init() + await qc.init(bot_id=123) + await qc.init(bot_id=456) mock_create.assert_awaited_once() + # Should keep the first bot_id + assert qc._bot_id == 123 @pytest.mark.asyncio async def test_close_closes_pool(self, mock_arq_pool): @@ -100,6 +105,7 @@ async def test_close_closes_pool(self, mock_arq_pool): mock_arq_pool.aclose.assert_awaited_once() assert qc._arq_redis is None + assert qc._bot_id is None @pytest.mark.asyncio async def test_close_idempotent(self): @@ -122,6 +128,16 @@ async def test_raises_when_not_initialized(self): import core.queue_client as qc qc._arq_redis = None + qc._bot_id = None + with pytest.raises(RuntimeError, match="not initialized"): + await qc.enqueue_scrape(url="https://example.com", chat_id=1) + + @pytest.mark.asyncio + async def test_raises_when_bot_id_not_set(self, mock_arq_pool): + import core.queue_client as qc + + qc._arq_redis = mock_arq_pool + qc._bot_id = None with pytest.raises(RuntimeError, match="not initialized"): await qc.enqueue_scrape(url="https://example.com", chat_id=1) @@ -130,6 +146,7 @@ async def test_returns_uuid_job_id(self, mock_arq_pool): import core.queue_client as qc qc._arq_redis = mock_arq_pool + qc._bot_id = 123 job_id = await qc.enqueue_scrape(url="https://example.com", chat_id=42) # Should be a valid UUID @@ -140,6 +157,7 @@ async def test_enqueues_with_correct_args(self, mock_arq_pool): import core.queue_client as qc qc._arq_redis = mock_arq_pool + qc._bot_id = 123 job_id = await qc.enqueue_scrape( url="https://twitter.com/post/1", chat_id=42, @@ -157,12 +175,14 @@ async def test_enqueues_with_correct_args(self, mock_arq_pool): assert call_args.kwargs["message_id"] == 99 assert call_args.kwargs["source"] == "twitter" assert call_args.kwargs["content_type"] == "social_media" + assert call_args.kwargs["bot_id"] == 123 @pytest.mark.asyncio async def test_passes_extra_kwargs(self, mock_arq_pool): import core.queue_client as qc qc._arq_redis = mock_arq_pool + qc._bot_id = 123 await qc.enqueue_scrape( url="u", chat_id=1, store_telegraph=True, store_document=False ) @@ -170,15 +190,18 @@ async def test_passes_extra_kwargs(self, mock_arq_pool): call_kwargs = mock_arq_pool.enqueue_job.call_args.kwargs assert call_kwargs["store_telegraph"] is True assert call_kwargs["store_document"] is False + assert call_kwargs["bot_id"] == 123 @pytest.mark.asyncio async def test_minimal_args(self, mock_arq_pool): import core.queue_client as qc qc._arq_redis = mock_arq_pool + qc._bot_id = 456 job_id = await qc.enqueue_scrape(url="https://example.com", chat_id=1) call_kwargs = mock_arq_pool.enqueue_job.call_args.kwargs assert call_kwargs["source"] == "" assert call_kwargs["content_type"] == "" assert call_kwargs["message_id"] is None + assert call_kwargs["bot_id"] == 456 From ee6e458a92372dec6fa55ae48fdc9188b55e5a3c Mon Sep 17 00:00:00 2001 From: aturret Date: Sun, 22 Mar 2026 19:31:35 -0500 Subject: [PATCH 2/3] fix: reply message functionality --- apps/telegram-bot/core/handlers/url_process.py | 2 +- apps/telegram-bot/core/services/message_sender.py | 11 +++++++---- apps/telegram-bot/core/services/outbox_consumer.py | 5 ++++- tests/unit/telegram_bot/test_outbox_consumer.py | 2 ++ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/apps/telegram-bot/core/handlers/url_process.py b/apps/telegram-bot/core/handlers/url_process.py index 2d54efc..43dfcd3 100644 --- a/apps/telegram-bot/core/handlers/url_process.py +++ b/apps/telegram-bot/core/handlers/url_process.py @@ -67,7 +67,7 @@ async def _fetch_and_send( await queue_client.enqueue_scrape( url=url, chat_id=chat_id, - message_id=message_id, + message_id=message_id or message.message_id if message else None, source=source, content_type=content_type, **kwargs, diff --git a/apps/telegram-bot/core/services/message_sender.py b/apps/telegram-bot/core/services/message_sender.py index 3ed2c08..05c33b9 100644 --- a/apps/telegram-bot/core/services/message_sender.py +++ b/apps/telegram-bot/core/services/message_sender.py @@ -52,16 +52,19 @@ def _get_application(): async def send_item_message( - data: dict, chat_id: Union[int, str] = None, message: Message = None + data: dict, chat_id: Union[int, str] = None, message: Message = None, + message_id: int = None, ) -> None: """ :param data: (dict) metadata of the item :param chat_id: (int) any chat id for sending :param message: (Message) any message to reply + :param message_id: (int) bare message ID for reply threading (used when Message object is unavailable, e.g. outbox consumer) :return: """ application = _get_application() logger.debug(f"send_item_message: {data}, {chat_id}, {message}") + _reply_to = message.message_id if message else message_id if not chat_id and not message: raise ValueError("must provide chat_id or message") if ( @@ -100,7 +103,7 @@ async def send_item_message( parse_mode=ParseMode.HTML, caption=caption_text, write_timeout=TELEBOT_WRITE_TIMEOUT, - reply_to_message_id=message.message_id if message else None, + reply_to_message_id=_reply_to, ) if sent_media_files_message is tuple: reply_to_message_id = sent_media_files_message[0].message_id @@ -112,7 +115,7 @@ async def send_item_message( chat_id=chat_id, text=caption_text, parse_mode=ParseMode.HTML, - reply_to_message_id=message.message_id if message else None, + reply_to_message_id=_reply_to, disable_web_page_preview=True if data["message_type"] == MessageType.SHORT else False, @@ -161,7 +164,7 @@ async def send_item_message( chat_id=chat_id, text=caption_text, parse_mode=ParseMode.HTML, - reply_to_message_id=message.message_id if message else None, + reply_to_message_id=_reply_to, disable_web_page_preview=True if data["message_type"] == "short" else False, diff --git a/apps/telegram-bot/core/services/outbox_consumer.py b/apps/telegram-bot/core/services/outbox_consumer.py index 882cf96..426eb27 100644 --- a/apps/telegram-bot/core/services/outbox_consumer.py +++ b/apps/telegram-bot/core/services/outbox_consumer.py @@ -47,7 +47,10 @@ async def _consume_loop() -> None: metadata_item = payload.get("metadata_item") if metadata_item and chat_id: logger.info(f"[{job_id}] Delivering result to chat {chat_id}") - await send_item_message(metadata_item, chat_id=chat_id) + await send_item_message( + metadata_item, chat_id=chat_id, + message_id=payload.get("message_id"), + ) else: logger.warning(f"[{job_id}] Invalid payload: missing metadata_item or chat_id") diff --git a/tests/unit/telegram_bot/test_outbox_consumer.py b/tests/unit/telegram_bot/test_outbox_consumer.py index 8de2e12..d3d1fb4 100644 --- a/tests/unit/telegram_bot/test_outbox_consumer.py +++ b/tests/unit/telegram_bot/test_outbox_consumer.py @@ -97,6 +97,7 @@ async def test_delivers_metadata_item(self, mock_redis): payload = _make_payload( metadata_item={"title": "Test", "content": "hi"}, chat_id=42, + message_id=99, ) call_count = 0 @@ -125,6 +126,7 @@ async def brpop_side_effect(*args, **kwargs): mock_send.assert_awaited_once() call_kwargs = mock_send.call_args.kwargs assert call_kwargs["chat_id"] == 42 + assert call_kwargs["message_id"] == 99 # --------------------------------------------------------------------------- From dc00916cb958b1f966f4713e43d59f5bfe346726 Mon Sep 17 00:00:00 2001 From: aturret Date: Sun, 22 Mar 2026 19:51:33 -0500 Subject: [PATCH 3/3] feat: fix logic for reply to message --- apps/telegram-bot/core/handlers/url_process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/telegram-bot/core/handlers/url_process.py b/apps/telegram-bot/core/handlers/url_process.py index 43dfcd3..74113ea 100644 --- a/apps/telegram-bot/core/handlers/url_process.py +++ b/apps/telegram-bot/core/handlers/url_process.py @@ -67,7 +67,7 @@ async def _fetch_and_send( await queue_client.enqueue_scrape( url=url, chat_id=chat_id, - message_id=message_id or message.message_id if message else None, + message_id=message_id if message_id is not None else (message.message_id if message else None), source=source, content_type=content_type, **kwargs,