From fa043952b7eccf40d452463e49218580f85f6500 Mon Sep 17 00:00:00 2001 From: Sameer Pashikanti Date: Tue, 1 Apr 2025 10:42:51 +0000 Subject: [PATCH 1/2] Implement image URL sanitization and add worker readiness/liveness checks - Introduced `sanitizeImageUrl` function to ensure proper handling of image URLs during local development. - Updated `AspectCard` and `ProjectLibraryAspect` components to utilize the new sanitization function for image URLs. - Added scripts for worker liveness and readiness checks to enhance monitoring and reliability of Celery workers. - Modified task configurations to include a new CPU queue and improved task acknowledgment settings. --- .../src/components/aspect/AspectCard.tsx | 4 +- echo/frontend/src/lib/utils.ts | 8 +++ .../project/library/ProjectLibraryAspect.tsx | 3 +- echo/server/dembrane/image_utils.py | 3 +- echo/server/dembrane/quote_utils.py | 10 +++- echo/server/dembrane/tasks.py | 57 ++++++++++++++++--- echo/server/dembrane/tasks_config.py | 25 ++++---- echo/server/prod-worker-cpu.sh | 5 ++ echo/server/prod-worker-liveness.py | 17 ++++++ echo/server/prod-worker-readiness.py | 9 +++ echo/server/prod-worker.sh | 2 +- echo/server/run-worker.sh | 1 + 12 files changed, 121 insertions(+), 23 deletions(-) create mode 100644 echo/server/prod-worker-cpu.sh create mode 100755 echo/server/prod-worker-liveness.py create mode 100755 echo/server/prod-worker-readiness.py diff --git a/echo/frontend/src/components/aspect/AspectCard.tsx b/echo/frontend/src/components/aspect/AspectCard.tsx index 0fbd4c72..dd8ffde2 100644 --- a/echo/frontend/src/components/aspect/AspectCard.tsx +++ b/echo/frontend/src/components/aspect/AspectCard.tsx @@ -1,5 +1,5 @@ import { Trans } from "@lingui/react/macro"; -import { cn } from "@/lib/utils"; +import { cn, sanitizeImageUrl } from "@/lib/utils"; import { Box, Button, LoadingOverlay, Paper, Stack, Text } from "@mantine/core"; import { IconArrowsDiagonal } from "@tabler/icons-react"; import { useParams } from "react-router-dom"; @@ -47,7 +47,7 @@ export const AspectCard = ({ {project.data?.image_generation_model !== "PLACEHOLDER" && ( {data.name diff --git a/echo/frontend/src/lib/utils.ts b/echo/frontend/src/lib/utils.ts index f7881a4a..125d566a 100644 --- a/echo/frontend/src/lib/utils.ts +++ b/echo/frontend/src/lib/utils.ts @@ -33,3 +33,11 @@ export const checkPermissionError = async () => { return "error" as const; } }; + +export const sanitizeImageUrl = (url: string) => { + // interim solution to fix image urls for local development + if (url.startsWith("http://minio:9000")) { + return url.replace("http://minio:9000", "http://localhost:9000"); + } + return url; +}; diff --git a/echo/frontend/src/routes/project/library/ProjectLibraryAspect.tsx b/echo/frontend/src/routes/project/library/ProjectLibraryAspect.tsx index 83ea20b0..4f1b3add 100644 --- a/echo/frontend/src/routes/project/library/ProjectLibraryAspect.tsx +++ b/echo/frontend/src/routes/project/library/ProjectLibraryAspect.tsx @@ -16,6 +16,7 @@ import { Breadcrumbs } from "@/components/common/Breadcrumbs"; import { useMemo } from "react"; import { useCopyAspect } from "@/hooks/useCopyAspect"; import { CopyIconButton } from "@/components/common/CopyIconButton"; +import { sanitizeImageUrl } from "@/lib/utils"; const dedupeQuotes = (quotes: QuoteAspect[]): QuoteAspect[] => { const seen = new Set(); @@ -78,7 +79,7 @@ export const ProjectLibraryAspect = () => { {project.data?.image_generation_model !== "PLACEHOLDER" && ( {aspect?.name diff --git a/echo/server/dembrane/image_utils.py b/echo/server/dembrane/image_utils.py index 7b7bd469..bfec2f0a 100644 --- a/echo/server/dembrane/image_utils.py +++ b/echo/server/dembrane/image_utils.py @@ -2,6 +2,7 @@ import logging from dembrane.s3 import save_to_s3_from_url +from dembrane.utils import generate_uuid from dembrane.openai import client logger = logging.getLogger("image_utils") @@ -115,7 +116,7 @@ def generate_image(prompt: str) -> str: image_url = response.data[0].url if image_url: logger.debug("saving the image and getting the public url") - image_url = save_to_s3_from_url(image_url) + image_url = save_to_s3_from_url(image_url, "images/" + generate_uuid(), public=True) else: image_url = None except Exception as e: diff --git a/echo/server/dembrane/quote_utils.py b/echo/server/dembrane/quote_utils.py index fa0464a7..e722eec7 100644 --- a/echo/server/dembrane/quote_utils.py +++ b/echo/server/dembrane/quote_utils.py @@ -803,9 +803,17 @@ def generate_aspect_image(db: Session, aspect_id: str) -> AspectModel: try: if response: image_url = response.data[0].url + try: + image_extension = str(image_url).split(".")[-1].split("?")[0] + except Exception as e: + logger.error(f"Error getting image extension: {e}") + image_extension = "png" + if image_url: logger.debug("saving the image and getting the public url") - image_url = save_to_s3_from_url(image_url) + image_url = save_to_s3_from_url( + image_url, "images/" + generate_uuid() + "." + image_extension, public=True + ) else: image_url = None except Exception as e: diff --git a/echo/server/dembrane/tasks.py b/echo/server/dembrane/tasks.py index 3ed9f3e9..eefd481b 100644 --- a/echo/server/dembrane/tasks.py +++ b/echo/server/dembrane/tasks.py @@ -1,8 +1,10 @@ # mypy: disable-error-code="no-untyped-def" from typing import List +from pathlib import Path -from celery import Celery, chain, chord, group, signals # type: ignore +from celery import Celery, chain, chord, group, signals, bootsteps # type: ignore from sentry_sdk import capture_exception +from celery.signals import worker_ready, worker_shutdown from celery.utils.log import get_task_logger # type: ignore import dembrane.tasks_config @@ -35,6 +37,36 @@ ) from dembrane.api.stateless import generate_summary +# File for validating worker readiness +READINESS_FILE = Path("/tmp/celery_ready") + +# File for validating worker liveness +HEARTBEAT_FILE = Path("/tmp/celery_worker_heartbeat") + + +class LivenessProbe(bootsteps.StartStopStep): + requires = {"celery.worker.components:Timer"} + + def __init__(self, parent, **kwargs): + super().__init__(parent, **kwargs) + self.requests = [] + self.tref = None + + def start(self, worker): + self.tref = worker.timer.call_repeatedly( + 1.0, + self.update_heartbeat_file, + (worker,), + priority=10, + ) + + def stop(self, worker): + HEARTBEAT_FILE.unlink(missing_ok=True) + + def update_heartbeat_file(self, worker): + HEARTBEAT_FILE.touch() + + logger = get_task_logger("celery_tasks") assert REDIS_URL, "REDIS_URL environment variable is not set" @@ -53,6 +85,18 @@ celery_app.config_from_object(dembrane.tasks_config) +celery_app.steps["worker"].add(LivenessProbe) + + +@worker_ready.connect +def worker_ready(**_): + READINESS_FILE.touch() + + +@worker_shutdown.connect +def worker_shutdown(**_): + READINESS_FILE.unlink(missing_ok=True) + @signals.celeryd_init.connect def init_sentry_celery(**_kwargs): @@ -106,12 +150,7 @@ def task_transcribe_conversation_chunk(self, conversation_chunk_id: str): raise self.retry(exc=e) from e -@celery_app.task( - bind=True, - retry_backoff=True, - ignore_result=True, - base=BaseTask, -) +@celery_app.task(bind=True, retry_backoff=True, ignore_result=True, base=BaseTask) def task_transcribe_conversation_chunks(self, conversation_chunk_id: List[str]): try: task_signatures = [ @@ -135,6 +174,7 @@ def task_transcribe_conversation_chunks(self, conversation_chunk_id: List[str]): retry_backoff=True, ignore_result=False, base=BaseTask, + queue="cpu", ) def task_split_audio_chunk(self, chunk_id: str) -> List[str]: """ @@ -346,6 +386,7 @@ def task_generate_insight_extras_multiple(self, insight_ids: List[str], language retry_kwargs={"max_retries": 3}, ignore_result=False, base=BaseTask, + queue="cpu", ) def task_initialize_insights(self, project_analysis_run_id: str) -> List[str]: with DatabaseSession() as db: @@ -450,6 +491,7 @@ def task_generate_view_extras(self, view_id: str, language: str): retry_kwargs={"max_retries": 3}, ignore_result=False, base=BaseTask, + queue="cpu", ) def task_assign_aspect_centroid(self, aspect_id: str, language: str = "en"): with DatabaseSession() as db: @@ -467,6 +509,7 @@ def task_assign_aspect_centroid(self, aspect_id: str, language: str = "en"): retry_kwargs={"max_retries": 3}, ignore_result=False, base=BaseTask, + queue="cpu", ) def task_cluster_quotes_using_aspect_centroids(self, view_id: str): with DatabaseSession() as db: diff --git a/echo/server/dembrane/tasks_config.py b/echo/server/dembrane/tasks_config.py index b1fb8003..c6817302 100644 --- a/echo/server/dembrane/tasks_config.py +++ b/echo/server/dembrane/tasks_config.py @@ -3,23 +3,28 @@ enable_utc = True worker_hijack_root_logger = False +worker_prefetch_multiplier = 1 task_queues = ( - Queue("high", Exchange("high"), routing_key="high"), Queue("normal", Exchange("normal"), routing_key="normal"), - Queue("low", Exchange("low"), routing_key="low"), + Queue("cpu", Exchange("cpu"), routing_key="cpu"), ) task_default_queue = "normal" task_default_exchange = "normal" task_default_routing_key = "normal" -task_ignore_result = True +task_ignore_result = False -# TODO: configure later -# CELERY_ROUTES = { -# # -- HIGH PRIORITY QUEUE -- # -# 'myapp.tasks.check_payment_status': {'queue': 'high'}, -# # -- LOW PRIORITY QUEUE -- # -# 'myapp.tasks.close_session': {'queue': 'low'}, -# } +task_acks_late = True +task_reject_on_worker_lost = True + +broker_connection_retry = True +broker_connection_retry_on_startup = True +broker_connection_max_retries = 5 +broker_connection_timeout = 2 + +broker_transport_options = { + "visibility_timeout": 1800, + "socket_keepalive": True, +} diff --git a/echo/server/prod-worker-cpu.sh b/echo/server/prod-worker-cpu.sh new file mode 100644 index 00000000..7189b892 --- /dev/null +++ b/echo/server/prod-worker-cpu.sh @@ -0,0 +1,5 @@ +#!/bin/sh +echo "Starting worker" + +POD_HOSTNAME=$(hostname) +celery -A dembrane.tasks worker -l INFO -n worker.${POD_HOSTNAME} -Q cpu \ No newline at end of file diff --git a/echo/server/prod-worker-liveness.py b/echo/server/prod-worker-liveness.py new file mode 100755 index 00000000..23760ba0 --- /dev/null +++ b/echo/server/prod-worker-liveness.py @@ -0,0 +1,17 @@ +import sys +import time +from pathlib import Path + +LIVENESS_FILE = Path("/tmp/celery_worker_heartbeat") +if not LIVENESS_FILE.is_file(): + print("Celery liveness file NOT found.") + sys.exit(1) +stats = LIVENESS_FILE.stat() +heartbeat_timestamp = stats.st_mtime +current_timestamp = time.time() +time_diff = current_timestamp - heartbeat_timestamp +if time_diff > 60: + print("Celery Worker liveness file timestamp DOES NOT matches the given constraint.") + sys.exit(1) +print("Celery Worker liveness file found and timestamp matches the given constraint.") +sys.exit(0) diff --git a/echo/server/prod-worker-readiness.py b/echo/server/prod-worker-readiness.py new file mode 100755 index 00000000..aee37a41 --- /dev/null +++ b/echo/server/prod-worker-readiness.py @@ -0,0 +1,9 @@ +import sys +from pathlib import Path + +READINESS_FILE = Path("/tmp/celery_ready") +if not READINESS_FILE.is_file(): + print("Celery readiness file NOT found.") + sys.exit(1) +print("Celery readiness file found.") +sys.exit(0) diff --git a/echo/server/prod-worker.sh b/echo/server/prod-worker.sh index 08eb3275..a76d9944 100755 --- a/echo/server/prod-worker.sh +++ b/echo/server/prod-worker.sh @@ -2,4 +2,4 @@ echo "Starting worker" POD_HOSTNAME=$(hostname) -celery -A dembrane.tasks worker -l INFO -n worker.normal.${POD_HOSTNAME} -Q normal \ No newline at end of file +celery -A dembrane.tasks worker -l INFO -n worker.${POD_HOSTNAME} -Q normal \ No newline at end of file diff --git a/echo/server/run-worker.sh b/echo/server/run-worker.sh index 41fa31b1..78931f06 100755 --- a/echo/server/run-worker.sh +++ b/echo/server/run-worker.sh @@ -29,6 +29,7 @@ WORKER_PIDS=() # Launch the workers launch_worker "worker.normal" "normal" +launch_worker "worker.cpu" "cpu" # Launch Flower # echo "Launching Flower" From 54d882595f411f7f988916fe8c19827fd8ce1d46 Mon Sep 17 00:00:00 2001 From: Sameer Pashikanti Date: Tue, 1 Apr 2025 10:45:17 +0000 Subject: [PATCH 2/2] Refactor liveness probe task methods for clarity - Updated method parameters in the LivenessProbe class to use underscore-prefixed names for unused variables, improving code readability. - Added type ignore comments for Celery signal imports to suppress type checking warnings. --- echo/server/dembrane/tasks.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/echo/server/dembrane/tasks.py b/echo/server/dembrane/tasks.py index eefd481b..6105700f 100644 --- a/echo/server/dembrane/tasks.py +++ b/echo/server/dembrane/tasks.py @@ -4,7 +4,7 @@ from celery import Celery, chain, chord, group, signals, bootsteps # type: ignore from sentry_sdk import capture_exception -from celery.signals import worker_ready, worker_shutdown +from celery.signals import worker_ready, worker_shutdown # type: ignore from celery.utils.log import get_task_logger # type: ignore import dembrane.tasks_config @@ -60,10 +60,10 @@ def start(self, worker): priority=10, ) - def stop(self, worker): + def stop(self, _worker): HEARTBEAT_FILE.unlink(missing_ok=True) - def update_heartbeat_file(self, worker): + def update_heartbeat_file(self, _worker): HEARTBEAT_FILE.touch() @@ -88,12 +88,12 @@ def update_heartbeat_file(self, worker): celery_app.steps["worker"].add(LivenessProbe) -@worker_ready.connect +@worker_ready.connect # type: ignore def worker_ready(**_): READINESS_FILE.touch() -@worker_shutdown.connect +@worker_shutdown.connect # type: ignore def worker_shutdown(**_): READINESS_FILE.unlink(missing_ok=True)